http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
 
b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
new file mode 100644
index 0000000..614c5b7
--- /dev/null
+++ 
b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormat.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.configuration.Configuration;
+
+/**
+ * OutputFormat to write tuples into a database.
+ * The OutputFormat has to be configured using the supplied 
OutputFormatBuilder.
+ * 
+ * @param <OUT>
+ * @see Tuple
+ * @see DriverManager
+ */
+public class JDBCOutputFormat<OUT extends Tuple> extends RichOutputFormat<OUT> 
{
+       private static final long serialVersionUID = 1L;
+
+       @SuppressWarnings("unused")
+       private static final Logger LOG = 
LoggerFactory.getLogger(JDBCOutputFormat.class);
+
+       private String username;
+       private String password;
+       private String drivername;
+       private String dbURL;
+       private String query;
+       private int batchInterval = 5000;
+
+       private Connection dbConn;
+       private PreparedStatement upload;
+
+       private SupportedTypes[] types = null;
+
+       private int batchCount = 0;
+
+       public JDBCOutputFormat() {
+       }
+
+       @Override
+       public void configure(Configuration parameters) {
+       }
+
+       /**
+        * Connects to the target database and initializes the prepared 
statement.
+        *
+        * @param taskNumber The number of the parallel instance.
+        * @throws IOException Thrown, if the output could not be opened due to 
an
+        * I/O problem.
+        */
+       @Override
+       public void open(int taskNumber, int numTasks) throws IOException {
+               try {
+                       establishConnection();
+                       upload = dbConn.prepareStatement(query);
+               } catch (SQLException sqe) {
+                       close();
+                       throw new IllegalArgumentException("open() failed:\t!", 
sqe);
+               } catch (ClassNotFoundException cnfe) {
+                       close();
+                       throw new IllegalArgumentException("JDBC-Class not 
found:\t", cnfe);
+               }
+       }
+
+       private void establishConnection() throws SQLException, 
ClassNotFoundException {
+               Class.forName(drivername);
+               if (username == null) {
+                       dbConn = DriverManager.getConnection(dbURL);
+               } else {
+                       dbConn = DriverManager.getConnection(dbURL, username, 
password);
+               }
+       }
+
+       private enum SupportedTypes {
+               BOOLEAN,
+               BYTE,
+               SHORT,
+               INTEGER,
+               LONG,
+               STRING,
+               FLOAT,
+               DOUBLE
+       }
+
+       /**
+        * Adds a record to the prepared statement.
+        * <p>
+        * When this method is called, the output format is guaranteed to be 
opened.
+        *
+        * @param tuple The records to add to the output.
+        * @throws IOException Thrown, if the records could not be added due to 
an I/O problem.
+        */
+       @Override
+       public void writeRecord(OUT tuple) throws IOException {
+               try {
+                       if (types == null) {
+                               extractTypes(tuple);
+                       }
+                       addValues(tuple);
+                       upload.addBatch();
+                       batchCount++;
+                       if (batchCount >= batchInterval) {
+                               upload.executeBatch();
+                               batchCount = 0;
+                       }
+               } catch (SQLException sqe) {
+                       close();
+                       throw new IllegalArgumentException("writeRecord() 
failed", sqe);
+               } catch (IllegalArgumentException iae) {
+                       close();
+                       throw new IllegalArgumentException("writeRecord() 
failed", iae);
+               }
+       }
+
+       private void extractTypes(OUT tuple) {
+               types = new SupportedTypes[tuple.getArity()];
+               for (int x = 0; x < tuple.getArity(); x++) {
+                       types[x] = 
SupportedTypes.valueOf(tuple.getField(x).getClass().getSimpleName().toUpperCase());
+               }
+       }
+
+       private void addValues(OUT tuple) throws SQLException {
+               for (int index = 0; index < tuple.getArity(); index++) {
+                       switch (types[index]) {
+                               case BOOLEAN:
+                                       upload.setBoolean(index + 1, (Boolean) 
tuple.getField(index));
+                                       break;
+                               case BYTE:
+                                       upload.setByte(index + 1, (Byte) 
tuple.getField(index));
+                                       break;
+                               case SHORT:
+                                       upload.setShort(index + 1, (Short) 
tuple.getField(index));
+                                       break;
+                               case INTEGER:
+                                       upload.setInt(index + 1, (Integer) 
tuple.getField(index));
+                                       break;
+                               case LONG:
+                                       upload.setLong(index + 1, (Long) 
tuple.getField(index));
+                                       break;
+                               case STRING:
+                                       upload.setString(index + 1, (String) 
tuple.getField(index));
+                                       break;
+                               case FLOAT:
+                                       upload.setFloat(index + 1, (Float) 
tuple.getField(index));
+                                       break;
+                               case DOUBLE:
+                                       upload.setDouble(index + 1, (Double) 
tuple.getField(index));
+                                       break;
+                       }
+               }
+       }
+
+       /**
+        * Executes prepared statement and closes all resources of this 
instance.
+        *
+        * @throws IOException Thrown, if the input could not be closed 
properly.
+        */
+       @Override
+       public void close() throws IOException {
+               try {
+                       upload.executeBatch();
+                       batchCount = 0;
+               } catch (SQLException se) {
+                       throw new IllegalArgumentException("close() failed", 
se);
+               } catch (NullPointerException se) {
+               }
+               try {
+                       upload.close();
+               } catch (SQLException se) {
+                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
+               } catch (NullPointerException npe) {
+               }
+               try {
+                       dbConn.close();
+               } catch (SQLException se) {
+                       LOG.info("Inputformat couldn't be closed - " + 
se.getMessage());
+               } catch (NullPointerException npe) {
+               }
+       }
+
+       public static JDBCOutputFormatBuilder buildJDBCOutputFormat() {
+               return new JDBCOutputFormatBuilder();
+       }
+
+       public static class JDBCOutputFormatBuilder {
+               private final JDBCOutputFormat format;
+
+               protected JDBCOutputFormatBuilder() {
+                       this.format = new JDBCOutputFormat();
+               }
+
+               public JDBCOutputFormatBuilder setUsername(String username) {
+                       format.username = username;
+                       return this;
+               }
+
+               public JDBCOutputFormatBuilder setPassword(String password) {
+                       format.password = password;
+                       return this;
+               }
+
+               public JDBCOutputFormatBuilder setDrivername(String drivername) 
{
+                       format.drivername = drivername;
+                       return this;
+               }
+
+               public JDBCOutputFormatBuilder setDBUrl(String dbURL) {
+                       format.dbURL = dbURL;
+                       return this;
+               }
+
+               public JDBCOutputFormatBuilder setQuery(String query) {
+                       format.query = query;
+                       return this;
+               }
+
+               public JDBCOutputFormatBuilder setBatchInterval(int 
batchInterval) {
+                       format.batchInterval = batchInterval;
+                       return this;
+               }
+
+               /**
+               Finalizes the configuration and checks validity.
+               @return Configured JDBCOutputFormat
+                */
+               public JDBCOutputFormat finish() {
+                       if (format.username == null) {
+                               LOG.info("Username was not supplied 
separately.");
+                       }
+                       if (format.password == null) {
+                               LOG.info("Password was not supplied 
separately.");
+                       }
+                       if (format.dbURL == null) {
+                               throw new IllegalArgumentException("No dababase 
URL supplied.");
+                       }
+                       if (format.query == null) {
+                               throw new IllegalArgumentException("No query 
suplied");
+                       }
+                       if (format.drivername == null) {
+                               throw new IllegalArgumentException("No driver 
supplied");
+                       }
+                       return format;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
 
b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
new file mode 100644
index 0000000..7b012ba
--- /dev/null
+++ 
b/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/example/JDBCExample.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc.example;
+
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.DOUBLE_TYPE_INFO;
+import static org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO;
+import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
+import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+
+public class JDBCExample {
+
+       public static void main(String[] args) throws Exception {
+               prepareTestDb();
+
+               ExecutionEnvironment environment = 
ExecutionEnvironment.getExecutionEnvironment();
+               DataSet<Tuple5> source
+                               = 
environment.createInput(JDBCInputFormat.buildJDBCInputFormat()
+                                               
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+                                               
.setDBUrl("jdbc:derby:memory:ebookshop")
+                                               .setQuery("select * from books")
+                                               .finish(),
+                                               new TupleTypeInfo(Tuple5.class, 
INT_TYPE_INFO, STRING_TYPE_INFO, STRING_TYPE_INFO, DOUBLE_TYPE_INFO, 
INT_TYPE_INFO)
+                               );
+
+               source.output(JDBCOutputFormat.buildJDBCOutputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+                               .setDBUrl("jdbc:derby:memory:ebookshop")
+                               .setQuery("insert into newbooks 
(id,title,author,price,qty) values (?,?,?,?,?)")
+                               .finish());
+               environment.execute();
+       }
+
+       private static void prepareTestDb() throws Exception {
+               String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+               Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+               Connection conn = DriverManager.getConnection(dbURL);
+
+               StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
books (");
+               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+               sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+               sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+               sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+               sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+               Statement stat = conn.createStatement();
+               stat.executeUpdate(sqlQueryBuilder.toString());
+               stat.close();
+
+               sqlQueryBuilder = new StringBuilder("CREATE TABLE newbooks (");
+               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+               sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+               sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+               sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+               sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+               stat = conn.createStatement();
+               stat.executeUpdate(sqlQueryBuilder.toString());
+               stat.close();
+
+               sqlQueryBuilder = new StringBuilder("INSERT INTO books (id, 
title, author, price, qty) VALUES ");
+               sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah 
Teck', 11.11, 11),");
+               sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah 
Teck', 22.22, 22),");
+               sqlQueryBuilder.append("(1003, 'More Java for more dummies', 
'Mohammad Ali', 33.33, 33),");
+               sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 
44),");
+               sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin 
Jones', 55.55, 55)");
+
+               stat = conn.createStatement();
+               stat.execute(sqlQueryBuilder.toString());
+               stat.close();
+
+               conn.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
 
b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
new file mode 100644
index 0000000..b76f8b8
--- /dev/null
+++ 
b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormatTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.junit.Assert;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class JDBCInputFormatTest {
+       JDBCInputFormat jdbcInputFormat;
+
+       static Connection conn;
+
+       static final Object[][] dbData = {
+               {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
+               {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
+               {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 
33},
+               {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
+               {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
+
+       @BeforeClass
+       public static void setUpClass() {
+               try {
+                       prepareDerbyDatabase();
+               } catch (Exception e) {
+                       Assert.fail();
+               }
+       }
+
+       private static void prepareDerbyDatabase() throws 
ClassNotFoundException, SQLException {
+               System.setProperty("derby.stream.error.field", 
"org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
+               String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+               Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+               conn = DriverManager.getConnection(dbURL);
+               createTable();
+               insertDataToSQLTable();
+               conn.close();
+       }
+
+       private static void createTable() throws SQLException {
+               StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
books (");
+               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+               sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+               sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+               sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+               sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+               Statement stat = conn.createStatement();
+               stat.executeUpdate(sqlQueryBuilder.toString());
+               stat.close();
+       }
+
+       private static void insertDataToSQLTable() throws SQLException {
+               StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO 
books (id, title, author, price, qty) VALUES ");
+               sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah 
Teck', 11.11, 11),");
+               sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah 
Teck', 22.22, 22),");
+               sqlQueryBuilder.append("(1003, 'More Java for more dummies', 
'Mohammad Ali', 33.33, 33),");
+               sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 
44),");
+               sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin 
Jones', 55.55, 55)");
+
+               Statement stat = conn.createStatement();
+               stat.execute(sqlQueryBuilder.toString());
+               stat.close();
+       }
+
+       @AfterClass
+       public static void tearDownClass() {
+               cleanUpDerbyDatabases();
+       }
+
+       private static void cleanUpDerbyDatabases() {
+               try {
+                       String dbURL = 
"jdbc:derby:memory:ebookshop;create=true";
+                       Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+
+                       conn = DriverManager.getConnection(dbURL);
+                       Statement stat = conn.createStatement();
+                       stat.executeUpdate("DROP TABLE books");
+                       stat.close();
+                       conn.close();
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail();
+               }
+       }
+
+       @After
+       public void tearDown() {
+               jdbcInputFormat = null;
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testInvalidDriver() throws IOException {
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.idontexist")
+                               .setDBUrl("jdbc:derby:memory:ebookshop")
+                               .setQuery("select * from books")
+                               .finish();
+               jdbcInputFormat.open(null);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testInvalidURL() throws IOException {
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+                               .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
+                               .setQuery("select * from books")
+                               .finish();
+               jdbcInputFormat.open(null);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testInvalidQuery() throws IOException {
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+                               .setDBUrl("jdbc:derby:memory:ebookshop")
+                               .setQuery("iamnotsql")
+                               .finish();
+               jdbcInputFormat.open(null);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testIncompleteConfiguration() throws IOException {
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+                               .setQuery("select * from books")
+                               .finish();
+       }
+
+       @Test(expected = IOException.class)
+       public void testIncompatibleTuple() throws IOException {
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+                               .setDBUrl("jdbc:derby:memory:ebookshop")
+                               .setQuery("select * from books")
+                               .finish();
+               jdbcInputFormat.open(null);
+               jdbcInputFormat.nextRecord(new Tuple2());
+       }
+
+       @Test
+       public void testJDBCInputFormat() throws IOException {
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+                               .setDBUrl("jdbc:derby:memory:ebookshop")
+                               .setQuery("select * from books")
+                               .finish();
+               jdbcInputFormat.open(null);
+               Tuple5 tuple = new Tuple5();
+               int recordCount = 0;
+               while (!jdbcInputFormat.reachedEnd()) {
+                       jdbcInputFormat.nextRecord(tuple);
+                       Assert.assertEquals("Field 0 should be int", 
Integer.class, tuple.getField(0).getClass());
+                       Assert.assertEquals("Field 1 should be String", 
String.class, tuple.getField(1).getClass());
+                       Assert.assertEquals("Field 2 should be String", 
String.class, tuple.getField(2).getClass());
+                       Assert.assertEquals("Field 3 should be float", 
Double.class, tuple.getField(3).getClass());
+                       Assert.assertEquals("Field 4 should be int", 
Integer.class, tuple.getField(4).getClass());
+
+                       for (int x = 0; x < 5; x++) {
+                               Assert.assertEquals(dbData[recordCount][x], 
tuple.getField(x));
+                       }
+                       recordCount++;
+               }
+               Assert.assertEquals(5, recordCount);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
 
b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
new file mode 100644
index 0000000..7d004f9
--- /dev/null
+++ 
b/flink-batch-connectors/flink-jdbc/src/test/java/org/apache/flink/api/java/io/jdbc/JDBCOutputFormatTest.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.io.jdbc;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.junit.Assert;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class JDBCOutputFormatTest {
+       private JDBCInputFormat jdbcInputFormat;
+       private JDBCOutputFormat jdbcOutputFormat;
+
+       private static Connection conn;
+
+       static final Object[][] dbData = {
+               {1001, ("Java for dummies"), ("Tan Ah Teck"), 11.11, 11},
+               {1002, ("More Java for dummies"), ("Tan Ah Teck"), 22.22, 22},
+               {1003, ("More Java for more dummies"), ("Mohammad Ali"), 33.33, 
33},
+               {1004, ("A Cup of Java"), ("Kumar"), 44.44, 44},
+               {1005, ("A Teaspoon of Java"), ("Kevin Jones"), 55.55, 55}};
+
+       @BeforeClass
+       public static void setUpClass() throws SQLException {
+               try {
+                       System.setProperty("derby.stream.error.field", 
"org.apache.flink.api.java.record.io.jdbc.DevNullLogStream.DEV_NULL");
+                       prepareDerbyDatabase();
+               } catch (ClassNotFoundException e) {
+                       e.printStackTrace();
+                       Assert.fail();
+               }
+       }
+
+       private static void prepareDerbyDatabase() throws 
ClassNotFoundException, SQLException {
+               String dbURL = "jdbc:derby:memory:ebookshop;create=true";
+               Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+               conn = DriverManager.getConnection(dbURL);
+               createTable("books");
+               createTable("newbooks");
+               insertDataToSQLTables();
+               conn.close();
+       }
+
+       private static void createTable(String tableName) throws SQLException {
+               StringBuilder sqlQueryBuilder = new StringBuilder("CREATE TABLE 
");
+               sqlQueryBuilder.append(tableName);
+               sqlQueryBuilder.append(" (");
+               sqlQueryBuilder.append("id INT NOT NULL DEFAULT 0,");
+               sqlQueryBuilder.append("title VARCHAR(50) DEFAULT NULL,");
+               sqlQueryBuilder.append("author VARCHAR(50) DEFAULT NULL,");
+               sqlQueryBuilder.append("price FLOAT DEFAULT NULL,");
+               sqlQueryBuilder.append("qty INT DEFAULT NULL,");
+               sqlQueryBuilder.append("PRIMARY KEY (id))");
+
+               Statement stat = conn.createStatement();
+               stat.executeUpdate(sqlQueryBuilder.toString());
+               stat.close();
+       }
+
+       private static void insertDataToSQLTables() throws SQLException {
+               StringBuilder sqlQueryBuilder = new StringBuilder("INSERT INTO 
books (id, title, author, price, qty) VALUES ");
+               sqlQueryBuilder.append("(1001, 'Java for dummies', 'Tan Ah 
Teck', 11.11, 11),");
+               sqlQueryBuilder.append("(1002, 'More Java for dummies', 'Tan Ah 
Teck', 22.22, 22),");
+               sqlQueryBuilder.append("(1003, 'More Java for more dummies', 
'Mohammad Ali', 33.33, 33),");
+               sqlQueryBuilder.append("(1004, 'A Cup of Java', 'Kumar', 44.44, 
44),");
+               sqlQueryBuilder.append("(1005, 'A Teaspoon of Java', 'Kevin 
Jones', 55.55, 55)");
+
+               Statement stat = conn.createStatement();
+               stat.execute(sqlQueryBuilder.toString());
+               stat.close();
+       }
+
+       @AfterClass
+       public static void tearDownClass() {
+               cleanUpDerbyDatabases();
+       }
+
+       private static void cleanUpDerbyDatabases() {
+               try {
+                       String dbURL = 
"jdbc:derby:memory:ebookshop;create=true";
+                       Class.forName("org.apache.derby.jdbc.EmbeddedDriver");
+
+                       conn = DriverManager.getConnection(dbURL);
+                       Statement stat = conn.createStatement();
+                       stat.executeUpdate("DROP TABLE books");
+                       stat.executeUpdate("DROP TABLE newbooks");
+                       stat.close();
+                       conn.close();
+               } catch (Exception e) {
+                       e.printStackTrace();
+                       Assert.fail();
+               }
+       }
+
+       @After
+       public void tearDown() {
+               jdbcOutputFormat = null;
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testInvalidDriver() throws IOException {
+               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.idontexist")
+                               .setDBUrl("jdbc:derby:memory:ebookshop")
+                               .setQuery("insert into books (id, title, 
author, price, qty) values (?,?,?,?,?)")
+                               .finish();
+               jdbcOutputFormat.open(0, 1);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testInvalidURL() throws IOException {
+               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+                               .setDBUrl("jdbc:der:iamanerror:mory:ebookshop")
+                               .setQuery("insert into books (id, title, 
author, price, qty) values (?,?,?,?,?)")
+                               .finish();
+               jdbcOutputFormat.open(0, 1);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testInvalidQuery() throws IOException {
+               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+                               .setDBUrl("jdbc:derby:memory:ebookshop")
+                               .setQuery("iamnotsql")
+                               .finish();
+               jdbcOutputFormat.open(0, 1);
+       }
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testIncompleteConfiguration() throws IOException {
+               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+                               .setQuery("insert into books (id, title, 
author, price, qty) values (?,?,?,?,?)")
+                               .finish();
+       }
+
+
+       @Test(expected = IllegalArgumentException.class)
+       public void testIncompatibleTypes() throws IOException {
+               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+                               
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
+                               .setDBUrl("jdbc:derby:memory:ebookshop")
+                               .setQuery("insert into books (id, title, 
author, price, qty) values (?,?,?,?,?)")
+                               .finish();
+               jdbcOutputFormat.open(0, 1);
+
+               Tuple5 tuple5 = new Tuple5();
+               tuple5.setField(4, 0);
+               tuple5.setField("hello", 1);
+               tuple5.setField("world", 2);
+               tuple5.setField(0.99, 3);
+               tuple5.setField("imthewrongtype", 4);
+
+               jdbcOutputFormat.writeRecord(tuple5);
+               jdbcOutputFormat.close();
+       }
+
+       @Test
+       public void testJDBCOutputFormat() throws IOException {
+               String sourceTable = "books";
+               String targetTable = "newbooks";
+               String driverPath = "org.apache.derby.jdbc.EmbeddedDriver";
+               String dbUrl = "jdbc:derby:memory:ebookshop";
+
+               jdbcOutputFormat = JDBCOutputFormat.buildJDBCOutputFormat()
+                               .setDBUrl(dbUrl)
+                               .setDrivername(driverPath)
+                               .setQuery("insert into " + targetTable + " (id, 
title, author, price, qty) values (?,?,?,?,?)")
+                               .finish();
+               jdbcOutputFormat.open(0, 1);
+
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               .setDrivername(driverPath)
+                               .setDBUrl(dbUrl)
+                               .setQuery("select * from " + sourceTable)
+                               .finish();
+               jdbcInputFormat.open(null);
+
+               Tuple5 tuple = new Tuple5();
+               while (!jdbcInputFormat.reachedEnd()) {
+                       jdbcInputFormat.nextRecord(tuple);
+                       jdbcOutputFormat.writeRecord(tuple);
+               }
+
+               jdbcOutputFormat.close();
+               jdbcInputFormat.close();
+
+               jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat()
+                               .setDrivername(driverPath)
+                               .setDBUrl(dbUrl)
+                               .setQuery("select * from " + targetTable)
+                               .finish();
+               jdbcInputFormat.open(null);
+
+               int recordCount = 0;
+               while (!jdbcInputFormat.reachedEnd()) {
+                       jdbcInputFormat.nextRecord(tuple);
+                       Assert.assertEquals("Field 0 should be int", 
Integer.class, tuple.getField(0).getClass());
+                       Assert.assertEquals("Field 1 should be String", 
String.class, tuple.getField(1).getClass());
+                       Assert.assertEquals("Field 2 should be String", 
String.class, tuple.getField(2).getClass());
+                       Assert.assertEquals("Field 3 should be float", 
Double.class, tuple.getField(3).getClass());
+                       Assert.assertEquals("Field 4 should be int", 
Integer.class, tuple.getField(4).getClass());
+
+                       for (int x = 0; x < 5; x++) {
+                               Assert.assertEquals(dbData[recordCount][x], 
tuple.getField(x));
+                       }
+
+                       recordCount++;
+               }
+               Assert.assertEquals(5, recordCount);
+
+               jdbcInputFormat.close();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties 
b/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties
new file mode 100644
index 0000000..2fb9345
--- /dev/null
+++ b/flink-batch-connectors/flink-jdbc/src/test/resources/log4j-test.properties
@@ -0,0 +1,19 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git 
a/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml 
b/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..8b3bb27
--- /dev/null
+++ b/flink-batch-connectors/flink-jdbc/src/test/resources/logback-test.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} 
%X{sourceThread} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="WARN">
+        <appender-ref ref="STDOUT"/>
+    </root>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-batch-connectors/pom.xml
----------------------------------------------------------------------
diff --git a/flink-batch-connectors/pom.xml b/flink-batch-connectors/pom.xml
new file mode 100644
index 0000000..25f98d1
--- /dev/null
+++ b/flink-batch-connectors/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+       <parent>
+               <groupId>org.apache.flink</groupId>
+               <artifactId>flink-parent</artifactId>
+               <version>1.0-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+
+
+       <artifactId>flink-batch-connectors</artifactId>
+       <name>flink-batch-connectors</name>
+       <packaging>pom</packaging>
+
+       <modules>
+               <module>flink-avro</module>
+               <module>flink-jdbc</module>
+               <module>flink-hadoop-compatibility</module>
+               <module>flink-hbase</module>
+               <module>flink-hcatalog</module>
+       </modules>
+       
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/pom.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/pom.xml b/flink-contrib/flink-tez/pom.xml
new file mode 100644
index 0000000..412640a
--- /dev/null
+++ b/flink-contrib/flink-tez/pom.xml
@@ -0,0 +1,224 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-contrib-parent</artifactId>
+        <version>1.0-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+
+    <artifactId>flink-tez</artifactId>
+    <name>flink-tez</name>
+
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>${shading-artifact.name}</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>${shading-artifact.name}</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-runtime</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>${shading-artifact.name}</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-optimizer</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>${shading-artifact.name}</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-examples-batch</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>${shading-artifact.name}</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>${shading-artifact.name}</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-tests</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.flink</groupId>
+                    <artifactId>${shading-artifact.name}</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-api</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-common</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-dag</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.tez</groupId>
+            <artifactId>tez-runtime-library</artifactId>
+            <version>${tez.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-mapreduce-client-core</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-client</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-common</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+        
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-yarn-api</artifactId>
+            <version>${hadoop.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-codec</groupId>
+            <artifactId>commons-codec</artifactId>
+            <version>1.4</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>2.4.1</version>
+                <configuration>
+                    <descriptors>
+                        
<descriptor>${basedir}/src/assembly/flink-fat-jar.xml</descriptor>
+                    </descriptors>
+                    <archive>
+                        <manifest>
+                            
<mainClass>org.apache.flink.tez.examples.ExampleDriver</mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <!--<id>assemble-all</id>-->
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml 
b/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml
new file mode 100644
index 0000000..504761a
--- /dev/null
+++ b/flink-contrib/flink-tez/src/assembly/flink-fat-jar.xml
@@ -0,0 +1,42 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<assembly 
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2";
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
 http://maven.apache.org/xsd/assembly-1.1.2.xsd";>
+       <id>flink-fat-jar</id>
+       <formats>
+               <format>jar</format>
+       </formats>
+       <includeBaseDirectory>false</includeBaseDirectory>
+       <dependencySets>
+               <dependencySet>
+                       <outputDirectory>/</outputDirectory>
+                       <useProjectArtifact>true</useProjectArtifact>
+                       <!--<excludes>
+                               <exclude>org.apache.flink:*</exclude>
+                       </excludes>-->
+                       <useTransitiveFiltering>true</useTransitiveFiltering>
+                       <unpack>true</unpack>
+                       <scope>runtime</scope>
+            <excludes>
+                <exclude>com.google.guava:guava</exclude>
+            </excludes>
+               </dependencySet>
+       </dependencySets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
new file mode 100644
index 0000000..4c091e5
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/LocalTezEnvironment.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.client;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+
+public class LocalTezEnvironment extends ExecutionEnvironment {
+
+       TezExecutor executor;
+       Optimizer compiler;
+
+       private LocalTezEnvironment() {
+               compiler = new Optimizer(new DataStatistics(), new 
DefaultCostEstimator(), new Configuration());
+               executor = new TezExecutor(compiler, this.getParallelism());
+       }
+
+       public static LocalTezEnvironment create() {
+               return new LocalTezEnvironment();
+       }
+
+       @Override
+       public JobExecutionResult execute(String jobName) throws Exception {
+               TezConfiguration tezConf = new TezConfiguration();
+               tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
+               tezConf.set("fs.defaultFS", "file:///");
+               
tezConf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, 
true);
+               executor.setConfiguration(tezConf);
+               return executor.executePlan(createProgramPlan(jobName));
+       }
+
+       @Override
+       public String getExecutionPlan() throws Exception {
+               Plan p = createProgramPlan(null, false);
+               return executor.getOptimizerPlanAsJSON(p);
+       }
+
+       public void setAsContext() {
+               ExecutionEnvironmentFactory factory = new 
ExecutionEnvironmentFactory() {
+                       @Override
+                       public ExecutionEnvironment 
createExecutionEnvironment() {
+                               return LocalTezEnvironment.this;
+                       }
+               };
+               initializeContextEnvironment(factory);
+       }
+
+       @Override
+       public void startNewSession() throws Exception {
+               throw new UnsupportedOperationException("Session management is 
not implemented in Flink on Tez.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
new file mode 100644
index 0000000..131937e
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/RemoteTezEnvironment.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ClassUtil;
+import org.apache.hadoop.util.ToolRunner;
+
+
+public class RemoteTezEnvironment extends ExecutionEnvironment {
+
+       private static final Log LOG = 
LogFactory.getLog(RemoteTezEnvironment.class);
+       
+       private Optimizer compiler;
+       private TezExecutor executor;
+       private Path jarPath = null;
+       
+
+       public void registerMainClass (Class mainClass) {
+               jarPath = new Path(ClassUtil.findContainingJar(mainClass));
+               LOG.info ("Registering main class " + mainClass.getName() + " 
contained in " + jarPath.toString());
+       }
+
+       @Override
+       public JobExecutionResult execute(String jobName) throws Exception {
+               TezExecutorTool tool = new TezExecutorTool(executor, 
createProgramPlan());
+               if (jarPath != null) {
+                       tool.setJobJar(jarPath);
+               }
+               try {
+                       int executionResult = ToolRunner.run(new 
Configuration(), tool, new String[]{jobName});
+               }
+               finally {
+                       return new JobExecutionResult(null, -1, null);
+               }
+
+       }
+
+       @Override
+       public String getExecutionPlan() throws Exception {
+               Plan p = createProgramPlan(null, false);
+               return executor.getOptimizerPlanAsJSON(p);
+       }
+
+       public static RemoteTezEnvironment create () {
+               return new RemoteTezEnvironment();
+       }
+
+       public RemoteTezEnvironment() {
+               compiler = new Optimizer(new DataStatistics(), new 
DefaultCostEstimator(), new org.apache.flink.configuration.Configuration());
+               executor = new TezExecutor(compiler, getParallelism());
+       }
+
+       @Override
+       public void startNewSession() throws Exception {
+               throw new UnsupportedOperationException("Session management is 
not implemented in Flink on Tez.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
new file mode 100644
index 0000000..60449db
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.PlanExecutor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.tez.dag.TezDAGGenerator;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.client.TezClient;
+import org.apache.tez.client.TezClientUtils;
+import org.apache.tez.common.TezCommonUtils;
+import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.client.DAGClient;
+import org.apache.tez.dag.api.client.DAGStatus;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public class TezExecutor extends PlanExecutor {
+
+       private static final Log LOG = LogFactory.getLog(TezExecutor.class);
+
+       private TezConfiguration tezConf;
+       private Optimizer compiler;
+       
+       private Path jarPath;
+
+       private long runTime = -1; //TODO get DAG execution time from Tez
+       private int parallelism;
+
+       public TezExecutor(TezConfiguration tezConf, Optimizer compiler, int 
parallelism) {
+               this.tezConf = tezConf;
+               this.compiler = compiler;
+               this.parallelism = parallelism;
+       }
+
+       public TezExecutor(Optimizer compiler, int parallelism) {
+               this.tezConf = null;
+               this.compiler = compiler;
+               this.parallelism = parallelism;
+       }
+
+       public void setConfiguration (TezConfiguration tezConf) {
+               this.tezConf = tezConf;
+       }
+
+       private JobExecutionResult executePlanWithConf (TezConfiguration 
tezConf, Plan plan) throws Exception {
+
+               String jobName = plan.getJobName();
+
+               TezClient tezClient = TezClient.create(jobName, tezConf);
+               tezClient.start();
+               try {
+                       OptimizedPlan optPlan = getOptimizedPlan(plan, 
parallelism);
+                       TezDAGGenerator dagGenerator = new 
TezDAGGenerator(tezConf, new Configuration());
+                       DAG dag = dagGenerator.createDAG(optPlan);
+
+                       if (jarPath != null) {
+                               addLocalResource(tezConf, jarPath, dag);
+                       }
+
+                       tezClient.waitTillReady();
+                       LOG.info("Submitting DAG to Tez Client");
+                       DAGClient dagClient = tezClient.submitDAG(dag);
+
+                       LOG.info("Submitted DAG to Tez Client");
+
+                       // monitoring
+                       DAGStatus dagStatus = dagClient.waitForCompletion();
+
+                       if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
+                               LOG.error (jobName + " failed with diagnostics: 
" + dagStatus.getDiagnostics());
+                               throw new RuntimeException(jobName + " failed 
with diagnostics: " + dagStatus.getDiagnostics());
+                       }
+                       LOG.info(jobName + " finished successfully");
+
+                       return new JobExecutionResult(null, runTime, null);
+
+               }
+               finally {
+                       tezClient.stop();
+               }
+       }
+
+       @Override
+       public void start() throws Exception {
+               throw new IllegalStateException("Session management is not 
supported in the TezExecutor.");
+       }
+
+       @Override
+       public void stop() throws Exception {
+               throw new IllegalStateException("Session management is not 
supported in the TezExecutor.");
+       }
+
+       @Override
+       public void endSession(JobID jobID) throws Exception {
+               throw new IllegalStateException("Session management is not 
supported in the TezExecutor.");
+       }
+
+       @Override
+       public boolean isRunning() {
+               return false;
+       }
+
+       @Override
+       public JobExecutionResult executePlan(Plan plan) throws Exception {
+               return executePlanWithConf(tezConf, plan);
+       }
+       
+       private static void addLocalResource (TezConfiguration tezConf, Path 
jarPath, DAG dag) {
+               
+               try {
+                       org.apache.hadoop.fs.FileSystem fs = 
org.apache.hadoop.fs.FileSystem.get(tezConf);
+
+                       LOG.info("Jar path received is " + jarPath.toString());
+
+                       String jarFile = jarPath.getName();
+
+                       Path remoteJarPath = null;
+                       
+                       /*
+                       if (tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR) == 
null) {
+                               LOG.info("Tez staging directory is null, 
setting it.");
+                               Path stagingDir = new 
Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
+                               LOG.info("Setting Tez staging directory to " + 
stagingDir.toString());
+                               
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+                               LOG.info("Set Tez staging directory to " + 
stagingDir.toString());
+                       }
+                       Path stagingDir = new 
Path(tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR));
+                       LOG.info("Ensuring that Tez staging directory exists");
+                       TezClientUtils.ensureStagingDirExists(tezConf, 
stagingDir);
+                       LOG.info("Tez staging directory exists and is " + 
stagingDir.toString());
+                       */
+
+
+                       Path stagingDir = 
TezCommonUtils.getTezBaseStagingPath(tezConf);
+                       LOG.info("Tez staging path is " + stagingDir);
+                       TezClientUtils.ensureStagingDirExists(tezConf, 
stagingDir);
+                       LOG.info("Tez staging dir exists");
+                       
+                       remoteJarPath = fs.makeQualified(new Path(stagingDir, 
jarFile));
+                       LOG.info("Copying " + jarPath.toString() + " to " + 
remoteJarPath.toString());
+                       fs.copyFromLocalFile(jarPath, remoteJarPath);
+
+
+                       FileStatus remoteJarStatus = 
fs.getFileStatus(remoteJarPath);
+                       Credentials credentials = new Credentials();
+                       TokenCache.obtainTokensForNamenodes(credentials, new 
Path[]{remoteJarPath}, tezConf);
+
+                       Map<String, LocalResource> localResources = new 
TreeMap<String, LocalResource>();
+                       LocalResource jobJar = LocalResource.newInstance(
+                                       
ConverterUtils.getYarnUrlFromPath(remoteJarPath),
+                                       LocalResourceType.FILE, 
LocalResourceVisibility.APPLICATION,
+                                       remoteJarStatus.getLen(), 
remoteJarStatus.getModificationTime());
+                       localResources.put(jarFile.toString(), jobJar);
+
+                       dag.addTaskLocalFiles(localResources);
+
+                       LOG.info("Added job jar as local resource.");
+               }
+               catch (Exception e) {
+                       System.out.println(e.getMessage());
+                       e.printStackTrace();
+                       System.exit(-1);
+               }
+       }
+       
+       public void setJobJar (Path jarPath) {
+               this.jarPath = jarPath;
+       }
+
+
+       @Override
+       public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
+               OptimizedPlan optPlan = getOptimizedPlan(plan, parallelism);
+               PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
+               return jsonGen.getOptimizerPlanAsJSON(optPlan);
+       }
+
+       public OptimizedPlan getOptimizedPlan(Plan p, int parallelism) throws 
CompilerException {
+               if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
+                       p.setDefaultParallelism(parallelism);
+               }
+               return this.compiler.compile(p);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
new file mode 100644
index 0000000..09289fb
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.Plan;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Tool;
+import org.apache.tez.dag.api.TezConfiguration;
+
+
+public class TezExecutorTool extends Configured implements Tool {
+
+       private static final Log LOG = LogFactory.getLog(TezExecutorTool.class);
+
+       private TezExecutor executor;
+       Plan plan;
+       private Path jarPath = null;
+
+       public TezExecutorTool(TezExecutor executor, Plan plan) {
+               this.executor = executor;
+               this.plan = plan;
+       }
+
+       public void setJobJar (Path jarPath) {
+               this.jarPath = jarPath;
+       }
+
+       @Override
+       public int run(String[] args) throws Exception {
+               
+               Configuration conf = getConf();
+               
+               TezConfiguration tezConf;
+               if (conf != null) {
+                       tezConf = new TezConfiguration(conf);
+               } else {
+                       tezConf = new TezConfiguration();
+               }
+
+               UserGroupInformation.setConfiguration(tezConf);
+
+               executor.setConfiguration(tezConf);
+
+               try {
+                       if (jarPath != null) {
+                               executor.setJobJar(jarPath);
+                       }
+                       JobExecutionResult result = executor.executePlan(plan);
+               }
+               catch (Exception e) {
+                       LOG.error("Job execution failed due to: " + 
e.getMessage());
+                       throw new RuntimeException(e.getMessage());
+               }
+               return 0;
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
new file mode 100644
index 0000000..6597733
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.tez.util.FlinkSerialization;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlinkBroadcastEdge extends FlinkEdge {
+
+       public FlinkBroadcastEdge(FlinkVertex source, FlinkVertex target, 
TypeSerializer<?> typeSerializer) {
+               super(source, target, typeSerializer);
+       }
+
+       @Override
+       public Edge createEdge(TezConfiguration tezConf) {
+
+               Map<String,String> serializerMap = new HashMap<String,String>();
+               serializerMap.put("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(this.typeSerializer));
+
+               try {
+                       UnorderedKVEdgeConfig edgeConfig =
+                                       (UnorderedKVEdgeConfig
+                                                       
.newBuilder(IntWritable.class.getName(), 
typeSerializer.createInstance().getClass().getName())
+                                                       
.setFromConfiguration(tezConf)
+                                                       
.setKeySerializationClass(WritableSerialization.class.getName(), null)
+                                                       
.setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
+                                                       .configureInput()
+                                                       
.setAdditionalConfiguration("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(this.typeSerializer))
+                                                       )
+                                                       .done()
+                                                       .build();
+
+                       EdgeProperty property = 
edgeConfig.createDefaultBroadcastEdgeProperty();
+                       this.cached = Edge.create(source.getVertex(), 
target.getVertex(), property);
+                       return cached;
+
+               } catch (Exception e) {
+                       throw new CompilerException(
+                                       "An error occurred while creating a Tez 
Forward Edge: " + e.getMessage(), e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
new file mode 100644
index 0000000..e3ddb9e
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.runtime.DataSinkProcessor;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+
+import java.io.IOException;
+
+public class FlinkDataSinkVertex extends FlinkVertex {
+
+       public FlinkDataSinkVertex(String taskName, int parallelism, 
TezTaskConfig taskConfig) {
+               super(taskName, parallelism, taskConfig);
+       }
+
+       @Override
+       public Vertex createVertex(TezConfiguration conf) {
+               try {
+                       this.writeInputPositionsToConfig();
+                       this.writeSubTasksInOutputToConfig();
+
+                       conf.set(TezTaskConfig.TEZ_TASK_CONFIG, 
EncodingUtils.encodeObjectToString(taskConfig));
+
+                       ProcessorDescriptor descriptor = 
ProcessorDescriptor.create(
+                                       DataSinkProcessor.class.getName());
+
+                       
descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+
+                       cached = Vertex.create(this.getUniqueName(), 
descriptor, getParallelism());
+
+                       return cached;
+               }
+               catch (IOException e) {
+                       throw new CompilerException(
+                                       "An error occurred while creating a Tez 
Vertex: " + e.getMessage(), e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
new file mode 100644
index 0000000..913b854
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.runtime.DataSourceProcessor;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.tez.runtime.input.FlinkInput;
+import org.apache.flink.tez.runtime.input.FlinkInputSplitGenerator;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.DataSourceDescriptor;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.InputInitializerDescriptor;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+
+import java.io.IOException;
+
+public class FlinkDataSourceVertex extends FlinkVertex {
+
+       public FlinkDataSourceVertex(String taskName, int parallelism, 
TezTaskConfig taskConfig) {
+               super(taskName, parallelism, taskConfig);
+       }
+
+
+       @Override
+       public Vertex createVertex (TezConfiguration conf) {
+               try {
+                       this.writeInputPositionsToConfig();
+                       this.writeSubTasksInOutputToConfig();
+
+                       
taskConfig.setDatasourceProcessorName(this.getUniqueName());
+                       conf.set(TezTaskConfig.TEZ_TASK_CONFIG, 
EncodingUtils.encodeObjectToString(taskConfig));
+
+                       ProcessorDescriptor descriptor = 
ProcessorDescriptor.create(
+                                       DataSourceProcessor.class.getName());
+
+                       
descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+
+                       InputDescriptor inputDescriptor = 
InputDescriptor.create(FlinkInput.class.getName());
+
+                       InputInitializerDescriptor inputInitializerDescriptor =
+                                       
InputInitializerDescriptor.create(FlinkInputSplitGenerator.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+
+                       DataSourceDescriptor dataSourceDescriptor = 
DataSourceDescriptor.create(
+                                       inputDescriptor,
+                                       inputInitializerDescriptor,
+                                       new Credentials()
+                       );
+
+                       cached = Vertex.create(this.getUniqueName(), 
descriptor, getParallelism());
+
+                       cached.addDataSource("Input " + this.getUniqueName(), 
dataSourceDescriptor);
+
+                       return cached;
+               }
+               catch (IOException e) {
+                       throw new CompilerException(
+                                       "An error occurred while creating a Tez 
Vertex: " + e.getMessage(), e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
new file mode 100644
index 0000000..181e675
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.TezConfiguration;
+
+public abstract class FlinkEdge {
+
+       protected FlinkVertex source;
+       protected FlinkVertex target;
+       protected TypeSerializer<?> typeSerializer;
+       protected Edge cached;
+
+       protected FlinkEdge(FlinkVertex source, FlinkVertex target, 
TypeSerializer<?> typeSerializer) {
+               this.source = source;
+               this.target = target;
+               this.typeSerializer = typeSerializer;
+       }
+
+       public abstract Edge createEdge(TezConfiguration tezConf);
+
+       public Edge getEdge () {
+               return cached;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
new file mode 100644
index 0000000..4602e96
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.tez.util.FlinkSerialization;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlinkForwardEdge extends FlinkEdge {
+
+       public FlinkForwardEdge(FlinkVertex source, FlinkVertex target, 
TypeSerializer<?> typeSerializer) {
+               super(source, target, typeSerializer);
+       }
+
+       @Override
+       public Edge createEdge(TezConfiguration tezConf) {
+
+               Map<String,String> serializerMap = new HashMap<String,String>();
+               serializerMap.put("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(this.typeSerializer));
+
+               try {
+                       UnorderedKVEdgeConfig edgeConfig =
+                                       (UnorderedKVEdgeConfig
+                                                       
.newBuilder(IntWritable.class.getName(), 
typeSerializer.createInstance().getClass().getName())
+                                                       
.setFromConfiguration(tezConf)
+                                                       
.setKeySerializationClass(WritableSerialization.class.getName(), null)
+                                                       
.setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
+                                                       .configureInput()
+                                                       
.setAdditionalConfiguration("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(
+                                                                       
this.typeSerializer
+                                                       )))
+                                                       .done()
+                                                       .build();
+
+                       EdgeProperty property = 
edgeConfig.createDefaultOneToOneEdgeProperty();
+                       this.cached = Edge.create(source.getVertex(), 
target.getVertex(), property);
+                       return cached;
+
+               } catch (Exception e) {
+                       throw new CompilerException(
+                                       "An error occurred while creating a Tez 
Forward Edge: " + e.getMessage(), e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
new file mode 100644
index 0000000..b5f8c2e
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.flink.tez.util.FlinkSerialization;
+import org.apache.flink.tez.runtime.output.SimplePartitioner;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.serializer.WritableSerialization;
+import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class FlinkPartitionEdge extends FlinkEdge {
+
+       public FlinkPartitionEdge(FlinkVertex source, FlinkVertex target, 
TypeSerializer<?> typeSerializer) {
+               super(source, target, typeSerializer);
+       }
+
+       @Override
+       public Edge createEdge(TezConfiguration tezConf) {
+
+               Map<String,String> serializerMap = new HashMap<String,String>();
+               serializerMap.put("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(this.typeSerializer));
+
+               try {
+                       UnorderedPartitionedKVEdgeConfig edgeConfig =
+                                       (UnorderedPartitionedKVEdgeConfig
+                                               
.newBuilder(IntWritable.class.getName(), 
typeSerializer.createInstance().getClass().getName(), 
SimplePartitioner.class.getName())
+                                       .setFromConfiguration(tezConf)
+                                       
.setKeySerializationClass(WritableSerialization.class.getName(), null)
+                                       
.setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
+                                       .configureInput()
+                                       
.setAdditionalConfiguration("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(this.typeSerializer)))
+                                       .done()
+                                       .build();
+
+
+                       EdgeProperty property = 
edgeConfig.createDefaultEdgeProperty();
+                       this.cached = Edge.create(source.getVertex(), 
target.getVertex(), property);
+                       return cached;
+
+               } catch (Exception e) {
+                       throw new CompilerException(
+                                       "An error occurred while creating a Tez 
Shuffle Edge: " + e.getMessage(), e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
new file mode 100644
index 0000000..2fbba36
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.runtime.RegularProcessor;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+
+import java.io.IOException;
+
+
+public class FlinkProcessorVertex extends FlinkVertex {
+
+       public FlinkProcessorVertex(String taskName, int parallelism, 
TezTaskConfig taskConfig) {
+               super(taskName, parallelism, taskConfig);
+       }
+
+       @Override
+       public Vertex createVertex(TezConfiguration conf) {
+               try {
+                       this.writeInputPositionsToConfig();
+                       this.writeSubTasksInOutputToConfig();
+
+                       conf.set(TezTaskConfig.TEZ_TASK_CONFIG, 
EncodingUtils.encodeObjectToString(taskConfig));
+
+                       ProcessorDescriptor descriptor = 
ProcessorDescriptor.create(
+                                       RegularProcessor.class.getName());
+
+                       
descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+
+                       cached = Vertex.create(this.getUniqueName(), 
descriptor, getParallelism());
+
+                       return cached;
+               } catch (IOException e) {
+                       throw new CompilerException(
+                                       "An error occurred while creating a Tez 
Vertex: " + e.getMessage(), e);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
new file mode 100644
index 0000000..0cf9990
--- /dev/null
+++ 
b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tez.dag;
+
+
+import org.apache.flink.optimizer.CompilerException;
+import org.apache.flink.tez.runtime.UnionProcessor;
+import org.apache.flink.tez.runtime.TezTaskConfig;
+import org.apache.flink.tez.util.EncodingUtils;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
+
+import java.io.IOException;
+
+public class FlinkUnionVertex extends FlinkVertex {
+
+       public FlinkUnionVertex(String taskName, int parallelism, TezTaskConfig 
taskConfig) {
+               super(taskName, parallelism, taskConfig);
+       }
+
+       @Override
+       public Vertex createVertex(TezConfiguration conf) {
+               try {
+                       this.writeInputPositionsToConfig();
+                       this.writeSubTasksInOutputToConfig();
+
+                       conf.set(TezTaskConfig.TEZ_TASK_CONFIG, 
EncodingUtils.encodeObjectToString(taskConfig));
+
+                       ProcessorDescriptor descriptor = 
ProcessorDescriptor.create(
+                                       UnionProcessor.class.getName());
+
+                       
descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+
+                       cached = Vertex.create(this.getUniqueName(), 
descriptor, getParallelism());
+
+                       return cached;
+               }
+               catch (IOException e) {
+                       throw new CompilerException(
+                                       "An error occurred while creating a Tez 
Vertex: " + e.getMessage(), e);
+               }
+       }
+}

Reply via email to