http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java new file mode 100644 index 0000000..614c5b7 --- /dev/null +++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.flink.api.common.io.RichOutputFormat; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.configuration.Configuration; + +/** + * OutputFormat to write tuples into a database. + * The OutputFormat has to be configured using the supplied OutputFormatBuilder. + * + * @param <OUT> + * @see Tuple + * @see DriverManager + */ +public class JDBCOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> { + private static final long serialVersionUID = 1L; + + @SuppressWarnings("unused") + private static final Logger LOG = LoggerFactory.getLogger(JDBCOutputFormat.class); + + private String username; + private String password; + private String drivername; + private String dbURL; + private String query; + private int batchInterval = 5000; + + private Connection dbConn; + private PreparedStatement upload; + + private SupportedTypes[] types = null; + + private int batchCount = 0; + + public JDBCOutputFormat() { + } + + @Override + public void configure(Configuration parameters) { + } + + /** + * Connects to the target database and initializes the prepared statement. + * + * @param taskNumber The number of the parallel instance. + * @throws IOException Thrown, if the output could not be opened due to an + * I/O problem. + */ + @Override + public void open(int taskNumber, int numTasks) throws IOException { + try { + establishConnection(); + upload = dbConn.prepareStatement(query); + } catch (SQLException sqe) { + close(); + throw new IllegalArgumentException("open() failed:\t!", sqe); + } catch (ClassNotFoundException cnfe) { + close(); + throw new IllegalArgumentException("JDBC-Class not found:\t", cnfe); + } + } + + private void establishConnection() throws SQLException, ClassNotFoundException { + Class.forName(drivername); + if (username == null) { + dbConn = DriverManager.getConnection(dbURL); + } else { + dbConn = DriverManager.getConnection(dbURL, username, password); + } + } + + private enum SupportedTypes { + BOOLEAN, + BYTE, + SHORT, + INTEGER, + LONG, + STRING, + FLOAT, + DOUBLE + } + + /** + * Adds a record to the prepared statement. + * <p> + * When this method is called, the output format is guaranteed to be opened. + * + * @param tuple The records to add to the output. + * @throws IOException Thrown, if the records could not be added due to an I/O problem. + */ + @Override + public void writeRecord(OUT tuple) throws IOException { + try { + if (types == null) { + extractTypes(tuple); + } + addValues(tuple); + upload.addBatch(); + batchCount++; + if (batchCount >= batchInterval) { + upload.executeBatch(); + batchCount = 0; + } + } catch (SQLException sqe) { + close(); + throw new IllegalArgumentException("writeRecord() failed", sqe); + } catch (IllegalArgumentException iae) { + close(); + throw new IllegalArgumentException("writeRecord() failed", iae); + } + } + + private void extractTypes(OUT tuple) { + types = new SupportedTypes[tuple.getArity()]; + for (int x = 0; x < tuple.getArity(); x++) { + types[x] = SupportedTypes.valueOf(tuple.getField(x).getClass().getSimpleName().toUpperCase()); + } + } + + private void addValues(OUT tuple) throws SQLException { + for (int index = 0; index < tuple.getArity(); index++) { + switch (types[index]) { + case BOOLEAN: + upload.setBoolean(index + 1, (Boolean) tuple.getField(index)); + break; + case BYTE: + upload.setByte(index + 1, (Byte) tuple.getField(index)); + break; + case SHORT: + upload.setShort(index + 1, (Short) tuple.getField(index)); + break; + case INTEGER: + upload.setInt(index + 1, (Integer) tuple.getField(index)); + break; + case LONG: + upload.setLong(index + 1, (Long) tuple.getField(index)); + break; + case STRING: + upload.setString(index + 1, (String) tuple.getField(index)); + break; + case FLOAT: + upload.setFloat(index + 1, (Float) tuple.getField(index)); + break; + case DOUBLE: + upload.setDouble(index + 1, (Double) tuple.getField(index)); + break; + } + } + } + + /** + * Executes prepared statement and closes all resources of this instance. + * + * @throws IOException Thrown, if the input could not be closed properly. + */ + @Override + public void close() throws IOException { + try { + upload.executeBatch(); + batchCount = 0; + } catch (SQLException se) { + throw new IllegalArgumentException("close() failed", se); + } catch (NullPointerException se) { + } + try { + upload.close(); + } catch (SQLException se) { + LOG.info("Inputformat couldn't be closed - " + se.getMessage()); + } catch (NullPointerException npe) { + } + try { + dbConn.close(); + } catch (SQLException se) { + LOG.info("Inputformat couldn't be closed - " + se.getMessage()); + } catch (NullPointerException npe) { + } + } + + public static JDBCOutputFormatBuilder buildJDBCOutputFormat() { + return new JDBCOutputFormatBuilder(); + } + + public static class JDBCOutputFormatBuilder { + private final JDBCOutputFormat format; + + protected JDBCOutputFormatBuilder() { + this.format = new JDBCOutputFormat(); + } + + public JDBCOutputFormatBuilder setUsername(String username) { + format.username = username; + return this; + } + + public JDBCOutputFormatBuilder setPassword(String password) { + format.password = password; + return this; + } + + public JDBCOutputFormatBuilder setDrivername(String drivername) { + format.drivername = drivername; + return this; + } + + public JDBCOutputFormatBuilder setDBUrl(String dbURL) { + format.dbURL = dbURL; + return this; + } + + public JDBCOutputFormatBuilder setQuery(String query) { + format.query = query; + return this; + } + + public JDBCOutputFormatBuilder setBatchInterval(int batchInterval) { + format.batchInterval = batchInterval; + return this; + } + + /** + Finalizes the configuration and checks validity. + @return Configured JDBCOutputFormat + */ + public JDBCOutputFormat finish() { + if (format.username == null) { + LOG.info("Username was not supplied separately."); + } + if (format.password == null) { + LOG.info("Password was not supplied separately."); + } + if (format.dbURL == null) { + throw new IllegalArgumentException("No dababase URL supplied."); + } + if (format.query == null) { + throw new IllegalArgumentException("No query suplied"); + } + if (format.drivername == null) { + throw new IllegalArgumentException("No driver supplied"); + } + return format; + } + } + +}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java new file mode 100644 index 0000000..7b012ba --- /dev/null +++ b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc.example; + +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO; +import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; +import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; + +public class JDBCExample { + + public static void main(String[] args) throws Exception { + prepareTestDb(); + + ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment(); + DataSet<Tuple5> source + = environment.createInput(JDBCInputFormat.buildJDBCInputFormat() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("select * from books") + .finish(), + new TupleTypeInfo(Tuple5.class, INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO, DOUBLE_TYPE_INFO, INT_TYPE_INFO) + ); + + source.output(JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("insert into newbooks (id,title,author,price,qty) values (?,?,?,?,?)") + .finish()); + environment.execute(); + } + + private static void prepareTestDb() throws Exception { + String dbURL = "jdbc:derby:memory:ebookshop;create=true"; + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + Connection conn = DriverManager.getConnection(dbURL); + + StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books ("); + sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); + sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); + sqlQueryBuilder.append("qty INT DEFAULT NULL,"); + sqlQueryBuilder.append("PRIMARY KEY (id))"); + + Statement stat = conn.createStatement(); + stat.executeUpdate(sqlQueryBuilder.toString()); + stat.close(); + + sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks ("); + sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); + sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); + sqlQueryBuilder.append("qty INT DEFAULT NULL,"); + sqlQueryBuilder.append("PRIMARY KEY (id))"); + + stat = conn.createStatement(); + stat.executeUpdate(sqlQueryBuilder.toString()); + stat.close(); + + sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); + sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); + sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); + sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); + sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); + sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); + + stat = conn.createStatement(); + stat.execute(sqlQueryBuilder.toString()); + stat.close(); + + conn.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java new file mode 100644 index 0000000..b76f8b8 --- /dev/null +++ b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +import org.junit.Assert; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple5; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class JDBCInputFormatTest { + JDBCInputFormat jdbcInputFormat; + + static Connection conn; + + static final Object[][] dbData = { + {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11}, + {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22}, + {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33}, + {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44}, + {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}}; + + @BeforeClass + public static void setUpClass() { + try { + prepareDerbyDatabase(); + } catch (Exception e) { + Assert.fail(); + } + } + + private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException { + System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL"); + String dbURL = "jdbc:derby:memory:ebookshop;create=true"; + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + conn = DriverManager.getConnection(dbURL); + createTable(); + insertDataToSQLTable(); + conn.close(); + } + + private static void createTable() throws SQLException { + StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE books ("); + sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); + sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); + sqlQueryBuilder.append("qty INT DEFAULT NULL,"); + sqlQueryBuilder.append("PRIMARY KEY (id))"); + + Statement stat = conn.createStatement(); + stat.executeUpdate(sqlQueryBuilder.toString()); + stat.close(); + } + + private static void insertDataToSQLTable() throws SQLException { + StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); + sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); + sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); + sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); + sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); + sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); + + Statement stat = conn.createStatement(); + stat.execute(sqlQueryBuilder.toString()); + stat.close(); + } + + @AfterClass + public static void tearDownClass() { + cleanUpDerbyDatabases(); + } + + private static void cleanUpDerbyDatabases() { + try { + String dbURL = "jdbc:derby:memory:ebookshop;create=true"; + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + + conn = DriverManager.getConnection(dbURL); + Statement stat = conn.createStatement(); + stat.executeUpdate("DROP TABLE books"); + stat.close(); + conn.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @After + public void tearDown() { + jdbcInputFormat = null; + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidDriver() throws IOException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername("org.apache.derby.jdbc.idontexist") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("select * from books") + .finish(); + jdbcInputFormat.open(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidURL() throws IOException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:der:iamanerror:mory:ebookshop") + .setQuery("select * from books") + .finish(); + jdbcInputFormat.open(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidQuery() throws IOException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("iamnotsql") + .finish(); + jdbcInputFormat.open(null); + } + + @Test(expected = IllegalArgumentException.class) + public void testIncompleteConfiguration() throws IOException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setQuery("select * from books") + .finish(); + } + + @Test(expected = IOException.class) + public void testIncompatibleTuple() throws IOException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("select * from books") + .finish(); + jdbcInputFormat.open(null); + jdbcInputFormat.nextRecord(new Tuple2()); + } + + @Test + public void testJDBCInputFormat() throws IOException { + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("select * from books") + .finish(); + jdbcInputFormat.open(null); + Tuple5 tuple = new Tuple5(); + int recordCount = 0; + while (!jdbcInputFormat.reachedEnd()) { + jdbcInputFormat.nextRecord(tuple); + Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass()); + Assert.assertEquals("Field 1 should be String", String.class, tuple.getField(1).getClass()); + Assert.assertEquals("Field 2 should be String", String.class, tuple.getField(2).getClass()); + Assert.assertEquals("Field 3 should be float", Double.class, tuple.getField(3).getClass()); + Assert.assertEquals("Field 4 should be int", Integer.class, tuple.getField(4).getClass()); + + for (int x = 0; x < 5; x++) { + Assert.assertEquals(dbData[recordCount][x], tuple.getField(x)); + } + recordCount++; + } + Assert.assertEquals(5, recordCount); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java new file mode 100644 index 0000000..7d004f9 --- /dev/null +++ b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.io.jdbc; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +import org.junit.Assert; + +import org.apache.flink.api.java.tuple.Tuple5; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class JDBCOutputFormatTest { + private JDBCInputFormat jdbcInputFormat; + private JDBCOutputFormat jdbcOutputFormat; + + private static Connection conn; + + static final Object[][] dbData = { + {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11}, + {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22}, + {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 33}, + {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44}, + {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}}; + + @BeforeClass + public static void setUpClass() throws SQLException { + try { + System.setProperty("derby.stream.error.field", "org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL"); + prepareDerbyDatabase(); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + Assert.fail(); + } + } + + private static void prepareDerbyDatabase() throws ClassNotFoundException, SQLException { + String dbURL = "jdbc:derby:memory:ebookshop;create=true"; + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + conn = DriverManager.getConnection(dbURL); + createTable("books"); + createTable("newbooks"); + insertDataToSQLTables(); + conn.close(); + } + + private static void createTable(String tableName) throws SQLException { + StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE "); + sqlQueryBuilder.append(tableName); + sqlQueryBuilder.append(" ("); + sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,"); + sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,"); + sqlQueryBuilder.append("price FLOAT DEFAULT NULL,"); + sqlQueryBuilder.append("qty INT DEFAULT NULL,"); + sqlQueryBuilder.append("PRIMARY KEY (id))"); + + Statement stat = conn.createStatement(); + stat.executeUpdate(sqlQueryBuilder.toString()); + stat.close(); + } + + private static void insertDataToSQLTables() throws SQLException { + StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, title, author, price, qty) VALUES "); + sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah Teck', 11.11, 11),"); + sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah Teck', 22.22, 22),"); + sqlQueryBuilder.append("(1003, 'More Java for more dummies', 'Mohammad Ali', 33.33, 33),"); + sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 44),"); + sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin Jones', 55.55, 55)"); + + Statement stat = conn.createStatement(); + stat.execute(sqlQueryBuilder.toString()); + stat.close(); + } + + @AfterClass + public static void tearDownClass() { + cleanUpDerbyDatabases(); + } + + private static void cleanUpDerbyDatabases() { + try { + String dbURL = "jdbc:derby:memory:ebookshop;create=true"; + Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); + + conn = DriverManager.getConnection(dbURL); + Statement stat = conn.createStatement(); + stat.executeUpdate("DROP TABLE books"); + stat.executeUpdate("DROP TABLE newbooks"); + stat.close(); + conn.close(); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(); + } + } + + @After + public void tearDown() { + jdbcOutputFormat = null; + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidDriver() throws IOException { + jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername("org.apache.derby.jdbc.idontexist") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)") + .finish(); + jdbcOutputFormat.open(0, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidURL() throws IOException { + jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:der:iamanerror:mory:ebookshop") + .setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)") + .finish(); + jdbcOutputFormat.open(0, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidQuery() throws IOException { + jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("iamnotsql") + .finish(); + jdbcOutputFormat.open(0, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testIncompleteConfiguration() throws IOException { + jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)") + .finish(); + } + + + @Test(expected = IllegalArgumentException.class) + public void testIncompatibleTypes() throws IOException { + jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() + .setDrivername("org.apache.derby.jdbc.EmbeddedDriver") + .setDBUrl("jdbc:derby:memory:ebookshop") + .setQuery("insert into books (id, title, author, price, qty) values (?,?,?,?,?)") + .finish(); + jdbcOutputFormat.open(0, 1); + + Tuple5 tuple5 = new Tuple5(); + tuple5.setField(4, 0); + tuple5.setField("hello", 1); + tuple5.setField("world", 2); + tuple5.setField(0.99, 3); + tuple5.setField("imthewrongtype", 4); + + jdbcOutputFormat.writeRecord(tuple5); + jdbcOutputFormat.close(); + } + + @Test + public void testJDBCOutputFormat() throws IOException { + String sourceTable = "books"; + String targetTable = "newbooks"; + String driverPath = "org.apache.derby.jdbc.EmbeddedDriver"; + String dbUrl = "jdbc:derby:memory:ebookshop"; + + jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat() + .setDBUrl(dbUrl) + .setDrivername(driverPath) + .setQuery("insert into " + targetTable + " (id, title, author, price, qty) values (?,?,?,?,?)") + .finish(); + jdbcOutputFormat.open(0, 1); + + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(driverPath) + .setDBUrl(dbUrl) + .setQuery("select * from " + sourceTable) + .finish(); + jdbcInputFormat.open(null); + + Tuple5 tuple = new Tuple5(); + while (!jdbcInputFormat.reachedEnd()) { + jdbcInputFormat.nextRecord(tuple); + jdbcOutputFormat.writeRecord(tuple); + } + + jdbcOutputFormat.close(); + jdbcInputFormat.close(); + + jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat() + .setDrivername(driverPath) + .setDBUrl(dbUrl) + .setQuery("select * from " + targetTable) + .finish(); + jdbcInputFormat.open(null); + + int recordCount = 0; + while (!jdbcInputFormat.reachedEnd()) { + jdbcInputFormat.nextRecord(tuple); + Assert.assertEquals("Field 0 should be int", Integer.class, tuple.getField(0).getClass()); + Assert.assertEquals("Field 1 should be String", String.class, tuple.getField(1).getClass()); + Assert.assertEquals("Field 2 should be String", String.class, tuple.getField(2).getClass()); + Assert.assertEquals("Field 3 should be float", Double.class, tuple.getField(3).getClass()); + Assert.assertEquals("Field 4 should be int", Integer.class, tuple.getField(4).getClass()); + + for (int x = 0; x < 5; x++) { + Assert.assertEquals(dbData[recordCount][x], tuple.getField(x)); + } + + recordCount++; + } + Assert.assertEquals(5, recordCount); + + jdbcInputFormat.close(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties b/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..2fb9345 --- /dev/null +++ b/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties @@ -0,0 +1,19 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +log4j.rootLogger=OFF \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml b/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml new file mode 100644 index 0000000..8b3bb27 --- /dev/null +++ b/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml @@ -0,0 +1,29 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/pom.xml ---------------------------------------------------------------------- diff --git a/flink-batch-connectors/pom.xml b/flink-batch-connectors/pom.xml new file mode 100644 index 0000000..25f98d1 --- /dev/null +++ b/flink-batch-connectors/pom.xml @@ -0,0 +1,45 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-parent</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + + <artifactId>flink-batch-connectors</artifactId> + <name>flink-batch-connectors</name> + <packaging>pom</packaging> + + <modules> + <module>flink-avro</module> + <module>flink-jdbc</module> + <module>flink-hadoop-compatibility</module> + <module>flink-hbase</module> + <module>flink-hcatalog</module> + </modules> + +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/pom.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/pom.xml b/flink-contrib/flink-tez/pom.xml new file mode 100644 index 0000000..412640a --- /dev/null +++ b/flink-contrib/flink-tez/pom.xml @@ -0,0 +1,224 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-contrib-parent</artifactId> + <version>1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-tez</artifactId> + <name>flink-tez</name> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.flink</groupId> + <artifactId>${shading-artifact.name}</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.flink</groupId> + <artifactId>${shading-artifact.name}</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-runtime</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.flink</groupId> + <artifactId>${shading-artifact.name}</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-optimizer</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.flink</groupId> + <artifactId>${shading-artifact.name}</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-examples-batch</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.flink</groupId> + <artifactId>${shading-artifact.name}</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.flink</groupId> + <artifactId>${shading-artifact.name}</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-tests</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.flink</groupId> + <artifactId>${shading-artifact.name}</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-api</artifactId> + <version>${tez.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-common</artifactId> + <version>${tez.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-dag</artifactId> + <version>${tez.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.tez</groupId> + <artifactId>tez-runtime-library</artifactId> + <version>${tez.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.4</version> + </dependency> + + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <version>2.4.1</version> + <configuration> + <descriptors> + <descriptor>${basedir}/src/assembly/flink-fat-jar.xml</descriptor> + </descriptors> + <archive> + <manifest> + <mainClass>org.apache.flink.tez.examples.ExampleDriver</mainClass> + </manifest> + </archive> + </configuration> + <executions> + <execution> + <!--<id>assemble-all</id>--> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml b/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml new file mode 100644 index 0000000..504761a --- /dev/null +++ b/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml @@ -0,0 +1,42 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd"> + <id>flink-fat-jar</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <outputDirectory>/</outputDirectory> + <useProjectArtifact>true</useProjectArtifact> + <!--<excludes> + <exclude>org.apache.flink:*</exclude> + </excludes>--> + <useTransitiveFiltering>true</useTransitiveFiltering> + <unpack>true</unpack> + <scope>runtime</scope> + <excludes> + <exclude>com.google.guava:guava</exclude> + </excludes> + </dependencySet> + </dependencySets> +</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java new file mode 100644 index 0000000..4c091e5 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.client; + +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.ExecutionEnvironmentFactory; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; + +public class LocalTezEnvironment extends ExecutionEnvironment { + + TezExecutor executor; + Optimizer compiler; + + private LocalTezEnvironment() { + compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new Configuration()); + executor = new TezExecutor(compiler, this.getParallelism()); + } + + public static LocalTezEnvironment create() { + return new LocalTezEnvironment(); + } + + @Override + public JobExecutionResult execute(String jobName) throws Exception { + TezConfiguration tezConf = new TezConfiguration(); + tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true); + tezConf.set("fs.defaultFS", "file:///"); + tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true); + executor.setConfiguration(tezConf); + return executor.executePlan(createProgramPlan(jobName)); + } + + @Override + public String getExecutionPlan() throws Exception { + Plan p = createProgramPlan(null, false); + return executor.getOptimizerPlanAsJSON(p); + } + + public void setAsContext() { + ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() { + @Override + public ExecutionEnvironment createExecutionEnvironment() { + return LocalTezEnvironment.this; + } + }; + initializeContextEnvironment(factory); + } + + @Override + public void startNewSession() throws Exception { + throw new UnsupportedOperationException("Session management is not implemented in Flink on Tez."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java new file mode 100644 index 0000000..131937e --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.optimizer.DataStatistics; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.ClassUtil; +import org.apache.hadoop.util.ToolRunner; + + +public class RemoteTezEnvironment extends ExecutionEnvironment { + + private static final Log LOG = LogFactory.getLog(RemoteTezEnvironment.class); + + private Optimizer compiler; + private TezExecutor executor; + private Path jarPath = null; + + + public void registerMainClass (Class mainClass) { + jarPath = new Path(ClassUtil.findContainingJar(mainClass)); + LOG.info ("Registering main class " + mainClass.getName() + " contained in " + jarPath.toString()); + } + + @Override + public JobExecutionResult execute(String jobName) throws Exception { + TezExecutorTool tool = new TezExecutorTool(executor, createProgramPlan()); + if (jarPath != null) { + tool.setJobJar(jarPath); + } + try { + int executionResult = ToolRunner.run(new Configuration(), tool, new String[]{jobName}); + } + finally { + return new JobExecutionResult(null, -1, null); + } + + } + + @Override + public String getExecutionPlan() throws Exception { + Plan p = createProgramPlan(null, false); + return executor.getOptimizerPlanAsJSON(p); + } + + public static RemoteTezEnvironment create () { + return new RemoteTezEnvironment(); + } + + public RemoteTezEnvironment() { + compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), new org.apache.flink.configuration.Configuration()); + executor = new TezExecutor(compiler, getParallelism()); + } + + @Override + public void startNewSession() throws Exception { + throw new UnsupportedOperationException("Session management is not implemented in Flink on Tez."); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java new file mode 100644 index 0000000..60449db --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.PlanExecutor; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator; +import org.apache.flink.tez.dag.TezDAGGenerator; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.tez.client.TezClient; +import org.apache.tez.client.TezClientUtils; +import org.apache.tez.common.TezCommonUtils; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.client.DAGClient; +import org.apache.tez.dag.api.client.DAGStatus; + +import java.util.Map; +import java.util.TreeMap; + +public class TezExecutor extends PlanExecutor { + + private static final Log LOG = LogFactory.getLog(TezExecutor.class); + + private TezConfiguration tezConf; + private Optimizer compiler; + + private Path jarPath; + + private long runTime = -1; //TODO get DAG execution time from Tez + private int parallelism; + + public TezExecutor(TezConfiguration tezConf, Optimizer compiler, int parallelism) { + this.tezConf = tezConf; + this.compiler = compiler; + this.parallelism = parallelism; + } + + public TezExecutor(Optimizer compiler, int parallelism) { + this.tezConf = null; + this.compiler = compiler; + this.parallelism = parallelism; + } + + public void setConfiguration (TezConfiguration tezConf) { + this.tezConf = tezConf; + } + + private JobExecutionResult executePlanWithConf (TezConfiguration tezConf, Plan plan) throws Exception { + + String jobName = plan.getJobName(); + + TezClient tezClient = TezClient.create(jobName, tezConf); + tezClient.start(); + try { + OptimizedPlan optPlan = getOptimizedPlan(plan, parallelism); + TezDAGGenerator dagGenerator = new TezDAGGenerator(tezConf, new Configuration()); + DAG dag = dagGenerator.createDAG(optPlan); + + if (jarPath != null) { + addLocalResource(tezConf, jarPath, dag); + } + + tezClient.waitTillReady(); + LOG.info("Submitting DAG to Tez Client"); + DAGClient dagClient = tezClient.submitDAG(dag); + + LOG.info("Submitted DAG to Tez Client"); + + // monitoring + DAGStatus dagStatus = dagClient.waitForCompletion(); + + if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) { + LOG.error (jobName + " failed with diagnostics: " + dagStatus.getDiagnostics()); + throw new RuntimeException(jobName + " failed with diagnostics: " + dagStatus.getDiagnostics()); + } + LOG.info(jobName + " finished successfully"); + + return new JobExecutionResult(null, runTime, null); + + } + finally { + tezClient.stop(); + } + } + + @Override + public void start() throws Exception { + throw new IllegalStateException("Session management is not supported in the TezExecutor."); + } + + @Override + public void stop() throws Exception { + throw new IllegalStateException("Session management is not supported in the TezExecutor."); + } + + @Override + public void endSession(JobID jobID) throws Exception { + throw new IllegalStateException("Session management is not supported in the TezExecutor."); + } + + @Override + public boolean isRunning() { + return false; + } + + @Override + public JobExecutionResult executePlan(Plan plan) throws Exception { + return executePlanWithConf(tezConf, plan); + } + + private static void addLocalResource (TezConfiguration tezConf, Path jarPath, DAG dag) { + + try { + org.apache.hadoop.fs.FileSystem fs = org.apache.hadoop.fs.FileSystem.get(tezConf); + + LOG.info("Jar path received is " + jarPath.toString()); + + String jarFile = jarPath.getName(); + + Path remoteJarPath = null; + + /* + if (tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR) == null) { + LOG.info("Tez staging directory is null, setting it."); + Path stagingDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString()); + LOG.info("Setting Tez staging directory to " + stagingDir.toString()); + tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString()); + LOG.info("Set Tez staging directory to " + stagingDir.toString()); + } + Path stagingDir = new Path(tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR)); + LOG.info("Ensuring that Tez staging directory exists"); + TezClientUtils.ensureStagingDirExists(tezConf, stagingDir); + LOG.info("Tez staging directory exists and is " + stagingDir.toString()); + */ + + + Path stagingDir = TezCommonUtils.getTezBaseStagingPath(tezConf); + LOG.info("Tez staging path is " + stagingDir); + TezClientUtils.ensureStagingDirExists(tezConf, stagingDir); + LOG.info("Tez staging dir exists"); + + remoteJarPath = fs.makeQualified(new Path(stagingDir, jarFile)); + LOG.info("Copying " + jarPath.toString() + " to " + remoteJarPath.toString()); + fs.copyFromLocalFile(jarPath, remoteJarPath); + + + FileStatus remoteJarStatus = fs.getFileStatus(remoteJarPath); + Credentials credentials = new Credentials(); + TokenCache.obtainTokensForNamenodes(credentials, new Path[]{remoteJarPath}, tezConf); + + Map<String, LocalResource> localResources = new TreeMap<String, LocalResource>(); + LocalResource jobJar = LocalResource.newInstance( + ConverterUtils.getYarnUrlFromPath(remoteJarPath), + LocalResourceType.FILE, LocalResourceVisibility.APPLICATION, + remoteJarStatus.getLen(), remoteJarStatus.getModificationTime()); + localResources.put(jarFile.toString(), jobJar); + + dag.addTaskLocalFiles(localResources); + + LOG.info("Added job jar as local resource."); + } + catch (Exception e) { + System.out.println(e.getMessage()); + e.printStackTrace(); + System.exit(-1); + } + } + + public void setJobJar (Path jarPath) { + this.jarPath = jarPath; + } + + + @Override + public String getOptimizerPlanAsJSON(Plan plan) throws Exception { + OptimizedPlan optPlan = getOptimizedPlan(plan, parallelism); + PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator(); + return jsonGen.getOptimizerPlanAsJSON(optPlan); + } + + public OptimizedPlan getOptimizedPlan(Plan p, int parallelism) throws CompilerException { + if (parallelism > 0 && p.getDefaultParallelism() <= 0) { + p.setDefaultParallelism(parallelism); + } + return this.compiler.compile(p); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java new file mode 100644 index 0000000..09289fb --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.client; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.Plan; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Tool; +import org.apache.tez.dag.api.TezConfiguration; + + +public class TezExecutorTool extends Configured implements Tool { + + private static final Log LOG = LogFactory.getLog(TezExecutorTool.class); + + private TezExecutor executor; + Plan plan; + private Path jarPath = null; + + public TezExecutorTool(TezExecutor executor, Plan plan) { + this.executor = executor; + this.plan = plan; + } + + public void setJobJar (Path jarPath) { + this.jarPath = jarPath; + } + + @Override + public int run(String[] args) throws Exception { + + Configuration conf = getConf(); + + TezConfiguration tezConf; + if (conf != null) { + tezConf = new TezConfiguration(conf); + } else { + tezConf = new TezConfiguration(); + } + + UserGroupInformation.setConfiguration(tezConf); + + executor.setConfiguration(tezConf); + + try { + if (jarPath != null) { + executor.setJobJar(jarPath); + } + JobExecutionResult result = executor.executePlan(plan); + } + catch (Exception e) { + LOG.error("Job execution failed due to: " + e.getMessage()); + throw new RuntimeException(e.getMessage()); + } + return 0; + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java new file mode 100644 index 0000000..6597733 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.dag; + + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.tez.util.EncodingUtils; +import org.apache.flink.tez.util.FlinkSerialization; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig; + +import java.util.HashMap; +import java.util.Map; + +public class FlinkBroadcastEdge extends FlinkEdge { + + public FlinkBroadcastEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) { + super(source, target, typeSerializer); + } + + @Override + public Edge createEdge(TezConfiguration tezConf) { + + Map<String,String> serializerMap = new HashMap<String,String>(); + serializerMap.put("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer)); + + try { + UnorderedKVEdgeConfig edgeConfig = + (UnorderedKVEdgeConfig + .newBuilder(IntWritable.class.getName(), typeSerializer.createInstance().getClass().getName()) + .setFromConfiguration(tezConf) + .setKeySerializationClass(WritableSerialization.class.getName(), null) + .setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap) + .configureInput() + .setAdditionalConfiguration("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer)) + ) + .done() + .build(); + + EdgeProperty property = edgeConfig.createDefaultBroadcastEdgeProperty(); + this.cached = Edge.create(source.getVertex(), target.getVertex(), property); + return cached; + + } catch (Exception e) { + throw new CompilerException( + "An error occurred while creating a Tez Forward Edge: " + e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java new file mode 100644 index 0000000..e3ddb9e --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.dag; + + +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.tez.runtime.DataSinkProcessor; +import org.apache.flink.tez.runtime.TezTaskConfig; +import org.apache.flink.tez.util.EncodingUtils; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; + +import java.io.IOException; + +public class FlinkDataSinkVertex extends FlinkVertex { + + public FlinkDataSinkVertex(String taskName, int parallelism, TezTaskConfig taskConfig) { + super(taskName, parallelism, taskConfig); + } + + @Override + public Vertex createVertex(TezConfiguration conf) { + try { + this.writeInputPositionsToConfig(); + this.writeSubTasksInOutputToConfig(); + + conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig)); + + ProcessorDescriptor descriptor = ProcessorDescriptor.create( + DataSinkProcessor.class.getName()); + + descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); + + cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism()); + + return cached; + } + catch (IOException e) { + throw new CompilerException( + "An error occurred while creating a Tez Vertex: " + e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java new file mode 100644 index 0000000..913b854 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.dag; + + +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.tez.runtime.DataSourceProcessor; +import org.apache.flink.tez.runtime.TezTaskConfig; +import org.apache.flink.tez.runtime.input.FlinkInput; +import org.apache.flink.tez.runtime.input.FlinkInputSplitGenerator; +import org.apache.flink.tez.util.EncodingUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.DataSourceDescriptor; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.InputInitializerDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; + +import java.io.IOException; + +public class FlinkDataSourceVertex extends FlinkVertex { + + public FlinkDataSourceVertex(String taskName, int parallelism, TezTaskConfig taskConfig) { + super(taskName, parallelism, taskConfig); + } + + + @Override + public Vertex createVertex (TezConfiguration conf) { + try { + this.writeInputPositionsToConfig(); + this.writeSubTasksInOutputToConfig(); + + taskConfig.setDatasourceProcessorName(this.getUniqueName()); + conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig)); + + ProcessorDescriptor descriptor = ProcessorDescriptor.create( + DataSourceProcessor.class.getName()); + + descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); + + InputDescriptor inputDescriptor = InputDescriptor.create(FlinkInput.class.getName()); + + InputInitializerDescriptor inputInitializerDescriptor = + InputInitializerDescriptor.create(FlinkInputSplitGenerator.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(conf)); + + DataSourceDescriptor dataSourceDescriptor = DataSourceDescriptor.create( + inputDescriptor, + inputInitializerDescriptor, + new Credentials() + ); + + cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism()); + + cached.addDataSource("Input " + this.getUniqueName(), dataSourceDescriptor); + + return cached; + } + catch (IOException e) { + throw new CompilerException( + "An error occurred while creating a Tez Vertex: " + e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java new file mode 100644 index 0000000..181e675 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.dag; + + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.TezConfiguration; + +public abstract class FlinkEdge { + + protected FlinkVertex source; + protected FlinkVertex target; + protected TypeSerializer<?> typeSerializer; + protected Edge cached; + + protected FlinkEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) { + this.source = source; + this.target = target; + this.typeSerializer = typeSerializer; + } + + public abstract Edge createEdge(TezConfiguration tezConf); + + public Edge getEdge () { + return cached; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java new file mode 100644 index 0000000..4602e96 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.dag; + + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.tez.util.EncodingUtils; +import org.apache.flink.tez.util.FlinkSerialization; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig; + +import java.util.HashMap; +import java.util.Map; + +public class FlinkForwardEdge extends FlinkEdge { + + public FlinkForwardEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) { + super(source, target, typeSerializer); + } + + @Override + public Edge createEdge(TezConfiguration tezConf) { + + Map<String,String> serializerMap = new HashMap<String,String>(); + serializerMap.put("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer)); + + try { + UnorderedKVEdgeConfig edgeConfig = + (UnorderedKVEdgeConfig + .newBuilder(IntWritable.class.getName(), typeSerializer.createInstance().getClass().getName()) + .setFromConfiguration(tezConf) + .setKeySerializationClass(WritableSerialization.class.getName(), null) + .setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap) + .configureInput() + .setAdditionalConfiguration("io.flink.typeserializer", EncodingUtils.encodeObjectToString( + this.typeSerializer + ))) + .done() + .build(); + + EdgeProperty property = edgeConfig.createDefaultOneToOneEdgeProperty(); + this.cached = Edge.create(source.getVertex(), target.getVertex(), property); + return cached; + + } catch (Exception e) { + throw new CompilerException( + "An error occurred while creating a Tez Forward Edge: " + e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java new file mode 100644 index 0000000..b5f8c2e --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.dag; + + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.tez.util.EncodingUtils; +import org.apache.flink.tez.util.FlinkSerialization; +import org.apache.flink.tez.runtime.output.SimplePartitioner; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.serializer.WritableSerialization; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig; + +import java.util.HashMap; +import java.util.Map; + +public class FlinkPartitionEdge extends FlinkEdge { + + public FlinkPartitionEdge(FlinkVertex source, FlinkVertex target, TypeSerializer<?> typeSerializer) { + super(source, target, typeSerializer); + } + + @Override + public Edge createEdge(TezConfiguration tezConf) { + + Map<String,String> serializerMap = new HashMap<String,String>(); + serializerMap.put("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer)); + + try { + UnorderedPartitionedKVEdgeConfig edgeConfig = + (UnorderedPartitionedKVEdgeConfig + .newBuilder(IntWritable.class.getName(), typeSerializer.createInstance().getClass().getName(), SimplePartitioner.class.getName()) + .setFromConfiguration(tezConf) + .setKeySerializationClass(WritableSerialization.class.getName(), null) + .setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap) + .configureInput() + .setAdditionalConfiguration("io.flink.typeserializer", EncodingUtils.encodeObjectToString(this.typeSerializer))) + .done() + .build(); + + + EdgeProperty property = edgeConfig.createDefaultEdgeProperty(); + this.cached = Edge.create(source.getVertex(), target.getVertex(), property); + return cached; + + } catch (Exception e) { + throw new CompilerException( + "An error occurred while creating a Tez Shuffle Edge: " + e.getMessage(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java new file mode 100644 index 0000000..2fbba36 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.dag; + +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.tez.runtime.RegularProcessor; +import org.apache.flink.tez.runtime.TezTaskConfig; +import org.apache.flink.tez.util.EncodingUtils; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; + +import java.io.IOException; + + +public class FlinkProcessorVertex extends FlinkVertex { + + public FlinkProcessorVertex(String taskName, int parallelism, TezTaskConfig taskConfig) { + super(taskName, parallelism, taskConfig); + } + + @Override + public Vertex createVertex(TezConfiguration conf) { + try { + this.writeInputPositionsToConfig(); + this.writeSubTasksInOutputToConfig(); + + conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig)); + + ProcessorDescriptor descriptor = ProcessorDescriptor.create( + RegularProcessor.class.getName()); + + descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); + + cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism()); + + return cached; + } catch (IOException e) { + throw new CompilerException( + "An error occurred while creating a Tez Vertex: " + e.getMessage(), e); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java new file mode 100644 index 0000000..0cf9990 --- /dev/null +++ b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.tez.dag; + + +import org.apache.flink.optimizer.CompilerException; +import org.apache.flink.tez.runtime.UnionProcessor; +import org.apache.flink.tez.runtime.TezTaskConfig; +import org.apache.flink.tez.util.EncodingUtils; +import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; + +import java.io.IOException; + +public class FlinkUnionVertex extends FlinkVertex { + + public FlinkUnionVertex(String taskName, int parallelism, TezTaskConfig taskConfig) { + super(taskName, parallelism, taskConfig); + } + + @Override + public Vertex createVertex(TezConfiguration conf) { + try { + this.writeInputPositionsToConfig(); + this.writeSubTasksInOutputToConfig(); + + conf.set(TezTaskConfig.TEZ_TASK_CONFIG, EncodingUtils.encodeObjectToString(taskConfig)); + + ProcessorDescriptor descriptor = ProcessorDescriptor.create( + UnionProcessor.class.getName()); + + descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); + + cached = Vertex.create(this.getUniqueName(), descriptor, getParallelism()); + + return cached; + } + catch (IOException e) { + throw new CompilerException( + "An error occurred while creating a Tez Vertex: " + e.getMessage(), e); + } + } +}