http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
deleted file mode 100644
index 60449db..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutor.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.client;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.PlanExecutor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.Optimizer;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.tez.dag.TezDAGGenerator;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.tez.client.TezClient;
-import org.apache.tez.client.TezClientUtils;
-import org.apache.tez.common.TezCommonUtils;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-public class TezExecutor extends PlanExecutor {
-
-       private static final Log LOG = LogFactory.getLog(TezExecutor.class);
-
-       private TezConfiguration tezConf;
-       private Optimizer compiler;
-       
-       private Path jarPath;
-
-       private long runTime = -1; //TODO get DAG execution time from Tez
-       private int parallelism;
-
-       public TezExecutor(TezConfiguration tezConf, Optimizer compiler, int 
parallelism) {
-               this.tezConf = tezConf;
-               this.compiler = compiler;
-               this.parallelism = parallelism;
-       }
-
-       public TezExecutor(Optimizer compiler, int parallelism) {
-               this.tezConf = null;
-               this.compiler = compiler;
-               this.parallelism = parallelism;
-       }
-
-       public void setConfiguration (TezConfiguration tezConf) {
-               this.tezConf = tezConf;
-       }
-
-       private JobExecutionResult executePlanWithConf (TezConfiguration 
tezConf, Plan plan) throws Exception {
-
-               String jobName = plan.getJobName();
-
-               TezClient tezClient = TezClient.create(jobName, tezConf);
-               tezClient.start();
-               try {
-                       OptimizedPlan optPlan = getOptimizedPlan(plan, 
parallelism);
-                       TezDAGGenerator dagGenerator = new 
TezDAGGenerator(tezConf, new Configuration());
-                       DAG dag = dagGenerator.createDAG(optPlan);
-
-                       if (jarPath != null) {
-                               addLocalResource(tezConf, jarPath, dag);
-                       }
-
-                       tezClient.waitTillReady();
-                       LOG.info("Submitting DAG to Tez Client");
-                       DAGClient dagClient = tezClient.submitDAG(dag);
-
-                       LOG.info("Submitted DAG to Tez Client");
-
-                       // monitoring
-                       DAGStatus dagStatus = dagClient.waitForCompletion();
-
-                       if (dagStatus.getState() != DAGStatus.State.SUCCEEDED) {
-                               LOG.error (jobName + " failed with diagnostics: 
" + dagStatus.getDiagnostics());
-                               throw new RuntimeException(jobName + " failed 
with diagnostics: " + dagStatus.getDiagnostics());
-                       }
-                       LOG.info(jobName + " finished successfully");
-
-                       return new JobExecutionResult(null, runTime, null);
-
-               }
-               finally {
-                       tezClient.stop();
-               }
-       }
-
-       @Override
-       public void start() throws Exception {
-               throw new IllegalStateException("Session management is not 
supported in the TezExecutor.");
-       }
-
-       @Override
-       public void stop() throws Exception {
-               throw new IllegalStateException("Session management is not 
supported in the TezExecutor.");
-       }
-
-       @Override
-       public void endSession(JobID jobID) throws Exception {
-               throw new IllegalStateException("Session management is not 
supported in the TezExecutor.");
-       }
-
-       @Override
-       public boolean isRunning() {
-               return false;
-       }
-
-       @Override
-       public JobExecutionResult executePlan(Plan plan) throws Exception {
-               return executePlanWithConf(tezConf, plan);
-       }
-       
-       private static void addLocalResource (TezConfiguration tezConf, Path 
jarPath, DAG dag) {
-               
-               try {
-                       org.apache.hadoop.fs.FileSystem fs = 
org.apache.hadoop.fs.FileSystem.get(tezConf);
-
-                       LOG.info("Jar path received is " + jarPath.toString());
-
-                       String jarFile = jarPath.getName();
-
-                       Path remoteJarPath = null;
-                       
-                       /*
-                       if (tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR) == 
null) {
-                               LOG.info("Tez staging directory is null, 
setting it.");
-                               Path stagingDir = new 
Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
-                               LOG.info("Setting Tez staging directory to " + 
stagingDir.toString());
-                               
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
-                               LOG.info("Set Tez staging directory to " + 
stagingDir.toString());
-                       }
-                       Path stagingDir = new 
Path(tezConf.get(TezConfiguration.TEZ_AM_STAGING_DIR));
-                       LOG.info("Ensuring that Tez staging directory exists");
-                       TezClientUtils.ensureStagingDirExists(tezConf, 
stagingDir);
-                       LOG.info("Tez staging directory exists and is " + 
stagingDir.toString());
-                       */
-
-
-                       Path stagingDir = 
TezCommonUtils.getTezBaseStagingPath(tezConf);
-                       LOG.info("Tez staging path is " + stagingDir);
-                       TezClientUtils.ensureStagingDirExists(tezConf, 
stagingDir);
-                       LOG.info("Tez staging dir exists");
-                       
-                       remoteJarPath = fs.makeQualified(new Path(stagingDir, 
jarFile));
-                       LOG.info("Copying " + jarPath.toString() + " to " + 
remoteJarPath.toString());
-                       fs.copyFromLocalFile(jarPath, remoteJarPath);
-
-
-                       FileStatus remoteJarStatus = 
fs.getFileStatus(remoteJarPath);
-                       Credentials credentials = new Credentials();
-                       TokenCache.obtainTokensForNamenodes(credentials, new 
Path[]{remoteJarPath}, tezConf);
-
-                       Map<String, LocalResource> localResources = new 
TreeMap<String, LocalResource>();
-                       LocalResource jobJar = LocalResource.newInstance(
-                                       
ConverterUtils.getYarnUrlFromPath(remoteJarPath),
-                                       LocalResourceType.FILE, 
LocalResourceVisibility.APPLICATION,
-                                       remoteJarStatus.getLen(), 
remoteJarStatus.getModificationTime());
-                       localResources.put(jarFile.toString(), jobJar);
-
-                       dag.addTaskLocalFiles(localResources);
-
-                       LOG.info("Added job jar as local resource.");
-               }
-               catch (Exception e) {
-                       System.out.println(e.getMessage());
-                       e.printStackTrace();
-                       System.exit(-1);
-               }
-       }
-       
-       public void setJobJar (Path jarPath) {
-               this.jarPath = jarPath;
-       }
-
-
-       @Override
-       public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
-               OptimizedPlan optPlan = getOptimizedPlan(plan, parallelism);
-               PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
-               return jsonGen.getOptimizerPlanAsJSON(optPlan);
-       }
-
-       public OptimizedPlan getOptimizedPlan(Plan p, int parallelism) throws 
CompilerException {
-               if (parallelism > 0 && p.getDefaultParallelism() <= 0) {
-                       p.setDefaultParallelism(parallelism);
-               }
-               return this.compiler.compile(p);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
deleted file mode 100644
index 09289fb..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/client/TezExecutorTool.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.client;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.util.Tool;
-import org.apache.tez.dag.api.TezConfiguration;
-
-
-public class TezExecutorTool extends Configured implements Tool {
-
-       private static final Log LOG = LogFactory.getLog(TezExecutorTool.class);
-
-       private TezExecutor executor;
-       Plan plan;
-       private Path jarPath = null;
-
-       public TezExecutorTool(TezExecutor executor, Plan plan) {
-               this.executor = executor;
-               this.plan = plan;
-       }
-
-       public void setJobJar (Path jarPath) {
-               this.jarPath = jarPath;
-       }
-
-       @Override
-       public int run(String[] args) throws Exception {
-               
-               Configuration conf = getConf();
-               
-               TezConfiguration tezConf;
-               if (conf != null) {
-                       tezConf = new TezConfiguration(conf);
-               } else {
-                       tezConf = new TezConfiguration();
-               }
-
-               UserGroupInformation.setConfiguration(tezConf);
-
-               executor.setConfiguration(tezConf);
-
-               try {
-                       if (jarPath != null) {
-                               executor.setJobJar(jarPath);
-                       }
-                       JobExecutionResult result = executor.executePlan(plan);
-               }
-               catch (Exception e) {
-                       LOG.error("Job execution failed due to: " + 
e.getMessage());
-                       throw new RuntimeException(e.getMessage());
-               }
-               return 0;
-       }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
deleted file mode 100644
index 6597733..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkBroadcastEdge.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.flink.tez.util.FlinkSerialization;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.serializer.WritableSerialization;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlinkBroadcastEdge extends FlinkEdge {
-
-       public FlinkBroadcastEdge(FlinkVertex source, FlinkVertex target, 
TypeSerializer<?> typeSerializer) {
-               super(source, target, typeSerializer);
-       }
-
-       @Override
-       public Edge createEdge(TezConfiguration tezConf) {
-
-               Map<String,String> serializerMap = new HashMap<String,String>();
-               serializerMap.put("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(this.typeSerializer));
-
-               try {
-                       UnorderedKVEdgeConfig edgeConfig =
-                                       (UnorderedKVEdgeConfig
-                                                       
.newBuilder(IntWritable.class.getName(), 
typeSerializer.createInstance().getClass().getName())
-                                                       
.setFromConfiguration(tezConf)
-                                                       
.setKeySerializationClass(WritableSerialization.class.getName(), null)
-                                                       
.setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
-                                                       .configureInput()
-                                                       
.setAdditionalConfiguration("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(this.typeSerializer))
-                                                       )
-                                                       .done()
-                                                       .build();
-
-                       EdgeProperty property = 
edgeConfig.createDefaultBroadcastEdgeProperty();
-                       this.cached = Edge.create(source.getVertex(), 
target.getVertex(), property);
-                       return cached;
-
-               } catch (Exception e) {
-                       throw new CompilerException(
-                                       "An error occurred while creating a Tez 
Forward Edge: " + e.getMessage(), e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
deleted file mode 100644
index e3ddb9e..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSinkVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.runtime.DataSinkProcessor;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-
-import java.io.IOException;
-
-public class FlinkDataSinkVertex extends FlinkVertex {
-
-       public FlinkDataSinkVertex(String taskName, int parallelism, 
TezTaskConfig taskConfig) {
-               super(taskName, parallelism, taskConfig);
-       }
-
-       @Override
-       public Vertex createVertex(TezConfiguration conf) {
-               try {
-                       this.writeInputPositionsToConfig();
-                       this.writeSubTasksInOutputToConfig();
-
-                       conf.set(TezTaskConfig.TEZ_TASK_CONFIG, 
EncodingUtils.encodeObjectToString(taskConfig));
-
-                       ProcessorDescriptor descriptor = 
ProcessorDescriptor.create(
-                                       DataSinkProcessor.class.getName());
-
-                       
descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-
-                       cached = Vertex.create(this.getUniqueName(), 
descriptor, getParallelism());
-
-                       return cached;
-               }
-               catch (IOException e) {
-                       throw new CompilerException(
-                                       "An error occurred while creating a Tez 
Vertex: " + e.getMessage(), e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
deleted file mode 100644
index 913b854..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkDataSourceVertex.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.runtime.DataSourceProcessor;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.tez.runtime.input.FlinkInput;
-import org.apache.flink.tez.runtime.input.FlinkInputSplitGenerator;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.hadoop.security.Credentials;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.DataSourceDescriptor;
-import org.apache.tez.dag.api.InputDescriptor;
-import org.apache.tez.dag.api.InputInitializerDescriptor;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-
-import java.io.IOException;
-
-public class FlinkDataSourceVertex extends FlinkVertex {
-
-       public FlinkDataSourceVertex(String taskName, int parallelism, 
TezTaskConfig taskConfig) {
-               super(taskName, parallelism, taskConfig);
-       }
-
-
-       @Override
-       public Vertex createVertex (TezConfiguration conf) {
-               try {
-                       this.writeInputPositionsToConfig();
-                       this.writeSubTasksInOutputToConfig();
-
-                       
taskConfig.setDatasourceProcessorName(this.getUniqueName());
-                       conf.set(TezTaskConfig.TEZ_TASK_CONFIG, 
EncodingUtils.encodeObjectToString(taskConfig));
-
-                       ProcessorDescriptor descriptor = 
ProcessorDescriptor.create(
-                                       DataSourceProcessor.class.getName());
-
-                       
descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-
-                       InputDescriptor inputDescriptor = 
InputDescriptor.create(FlinkInput.class.getName());
-
-                       InputInitializerDescriptor inputInitializerDescriptor =
-                                       
InputInitializerDescriptor.create(FlinkInputSplitGenerator.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-
-                       DataSourceDescriptor dataSourceDescriptor = 
DataSourceDescriptor.create(
-                                       inputDescriptor,
-                                       inputInitializerDescriptor,
-                                       new Credentials()
-                       );
-
-                       cached = Vertex.create(this.getUniqueName(), 
descriptor, getParallelism());
-
-                       cached.addDataSource("Input " + this.getUniqueName(), 
dataSourceDescriptor);
-
-                       return cached;
-               }
-               catch (IOException e) {
-                       throw new CompilerException(
-                                       "An error occurred while creating a Tez 
Vertex: " + e.getMessage(), e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
deleted file mode 100644
index 181e675..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkEdge.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.TezConfiguration;
-
-public abstract class FlinkEdge {
-
-       protected FlinkVertex source;
-       protected FlinkVertex target;
-       protected TypeSerializer<?> typeSerializer;
-       protected Edge cached;
-
-       protected FlinkEdge(FlinkVertex source, FlinkVertex target, 
TypeSerializer<?> typeSerializer) {
-               this.source = source;
-               this.target = target;
-               this.typeSerializer = typeSerializer;
-       }
-
-       public abstract Edge createEdge(TezConfiguration tezConf);
-
-       public Edge getEdge () {
-               return cached;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
deleted file mode 100644
index 4602e96..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkForwardEdge.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.flink.tez.util.FlinkSerialization;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.serializer.WritableSerialization;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.runtime.library.conf.UnorderedKVEdgeConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlinkForwardEdge extends FlinkEdge {
-
-       public FlinkForwardEdge(FlinkVertex source, FlinkVertex target, 
TypeSerializer<?> typeSerializer) {
-               super(source, target, typeSerializer);
-       }
-
-       @Override
-       public Edge createEdge(TezConfiguration tezConf) {
-
-               Map<String,String> serializerMap = new HashMap<String,String>();
-               serializerMap.put("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(this.typeSerializer));
-
-               try {
-                       UnorderedKVEdgeConfig edgeConfig =
-                                       (UnorderedKVEdgeConfig
-                                                       
.newBuilder(IntWritable.class.getName(), 
typeSerializer.createInstance().getClass().getName())
-                                                       
.setFromConfiguration(tezConf)
-                                                       
.setKeySerializationClass(WritableSerialization.class.getName(), null)
-                                                       
.setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
-                                                       .configureInput()
-                                                       
.setAdditionalConfiguration("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(
-                                                                       
this.typeSerializer
-                                                       )))
-                                                       .done()
-                                                       .build();
-
-                       EdgeProperty property = 
edgeConfig.createDefaultOneToOneEdgeProperty();
-                       this.cached = Edge.create(source.getVertex(), 
target.getVertex(), property);
-                       return cached;
-
-               } catch (Exception e) {
-                       throw new CompilerException(
-                                       "An error occurred while creating a Tez 
Forward Edge: " + e.getMessage(), e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
deleted file mode 100644
index b5f8c2e..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkPartitionEdge.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.flink.tez.util.FlinkSerialization;
-import org.apache.flink.tez.runtime.output.SimplePartitioner;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.serializer.WritableSerialization;
-import org.apache.tez.dag.api.Edge;
-import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class FlinkPartitionEdge extends FlinkEdge {
-
-       public FlinkPartitionEdge(FlinkVertex source, FlinkVertex target, 
TypeSerializer<?> typeSerializer) {
-               super(source, target, typeSerializer);
-       }
-
-       @Override
-       public Edge createEdge(TezConfiguration tezConf) {
-
-               Map<String,String> serializerMap = new HashMap<String,String>();
-               serializerMap.put("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(this.typeSerializer));
-
-               try {
-                       UnorderedPartitionedKVEdgeConfig edgeConfig =
-                                       (UnorderedPartitionedKVEdgeConfig
-                                               
.newBuilder(IntWritable.class.getName(), 
typeSerializer.createInstance().getClass().getName(), 
SimplePartitioner.class.getName())
-                                       .setFromConfiguration(tezConf)
-                                       
.setKeySerializationClass(WritableSerialization.class.getName(), null)
-                                       
.setValueSerializationClass(FlinkSerialization.class.getName(), serializerMap)
-                                       .configureInput()
-                                       
.setAdditionalConfiguration("io.flink.typeserializer", 
EncodingUtils.encodeObjectToString(this.typeSerializer)))
-                                       .done()
-                                       .build();
-
-
-                       EdgeProperty property = 
edgeConfig.createDefaultEdgeProperty();
-                       this.cached = Edge.create(source.getVertex(), 
target.getVertex(), property);
-                       return cached;
-
-               } catch (Exception e) {
-                       throw new CompilerException(
-                                       "An error occurred while creating a Tez 
Shuffle Edge: " + e.getMessage(), e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
deleted file mode 100644
index 2fbba36..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkProcessorVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.runtime.RegularProcessor;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-
-import java.io.IOException;
-
-
-public class FlinkProcessorVertex extends FlinkVertex {
-
-       public FlinkProcessorVertex(String taskName, int parallelism, 
TezTaskConfig taskConfig) {
-               super(taskName, parallelism, taskConfig);
-       }
-
-       @Override
-       public Vertex createVertex(TezConfiguration conf) {
-               try {
-                       this.writeInputPositionsToConfig();
-                       this.writeSubTasksInOutputToConfig();
-
-                       conf.set(TezTaskConfig.TEZ_TASK_CONFIG, 
EncodingUtils.encodeObjectToString(taskConfig));
-
-                       ProcessorDescriptor descriptor = 
ProcessorDescriptor.create(
-                                       RegularProcessor.class.getName());
-
-                       
descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-
-                       cached = Vertex.create(this.getUniqueName(), 
descriptor, getParallelism());
-
-                       return cached;
-               } catch (IOException e) {
-                       throw new CompilerException(
-                                       "An error occurred while creating a Tez 
Vertex: " + e.getMessage(), e);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
deleted file mode 100644
index 0cf9990..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkUnionVertex.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.tez.runtime.UnionProcessor;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.tez.util.EncodingUtils;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-
-import java.io.IOException;
-
-public class FlinkUnionVertex extends FlinkVertex {
-
-       public FlinkUnionVertex(String taskName, int parallelism, TezTaskConfig 
taskConfig) {
-               super(taskName, parallelism, taskConfig);
-       }
-
-       @Override
-       public Vertex createVertex(TezConfiguration conf) {
-               try {
-                       this.writeInputPositionsToConfig();
-                       this.writeSubTasksInOutputToConfig();
-
-                       conf.set(TezTaskConfig.TEZ_TASK_CONFIG, 
EncodingUtils.encodeObjectToString(taskConfig));
-
-                       ProcessorDescriptor descriptor = 
ProcessorDescriptor.create(
-                                       UnionProcessor.class.getName());
-
-                       
descriptor.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
-
-                       cached = Vertex.create(this.getUniqueName(), 
descriptor, getParallelism());
-
-                       return cached;
-               }
-               catch (IOException e) {
-                       throw new CompilerException(
-                                       "An error occurred while creating a Tez 
Vertex: " + e.getMessage(), e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
deleted file mode 100644
index 883acc6..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/FlinkVertex.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.apache.tez.dag.api.Vertex;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-public abstract class FlinkVertex {
-
-       protected Vertex cached;
-       private String taskName;
-       private int parallelism;
-       protected TezTaskConfig taskConfig;
-
-       // Tez-specific bookkeeping
-       protected String uniqueName; //Unique name in DAG
-       private Map<FlinkVertex,ArrayList<Integer>> inputPositions;
-       private ArrayList<Integer> numberOfSubTasksInOutputs;
-
-       public TezTaskConfig getConfig() {
-               return taskConfig;
-       }
-
-       public FlinkVertex(String taskName, int parallelism, TezTaskConfig 
taskConfig) {
-               this.cached = null;
-               this.taskName = taskName;
-               this.parallelism = parallelism;
-               this.taskConfig = taskConfig;
-               this.uniqueName = taskName + UUID.randomUUID().toString();
-               this.inputPositions = new HashMap<FlinkVertex, 
ArrayList<Integer>>();
-               this.numberOfSubTasksInOutputs = new ArrayList<Integer>();
-       }
-
-       public int getParallelism () {
-               return parallelism;
-       }
-
-       public void setParallelism (int parallelism) {
-               this.parallelism = parallelism;
-       }
-
-       public abstract Vertex createVertex (TezConfiguration conf);
-
-       public Vertex getVertex () {
-               return cached;
-       }
-
-       protected String getUniqueName () {
-               return uniqueName;
-       }
-
-       public void addInput (FlinkVertex vertex, int position) {
-               if (inputPositions.containsKey(vertex)) {
-                       inputPositions.get(vertex).add(position);
-               }
-               else {
-                       ArrayList<Integer> lst = new ArrayList<Integer>();
-                       lst.add(position);
-                       inputPositions.put(vertex,lst);
-               }
-       }
-
-       public void addNumberOfSubTasksInOutput (int subTasks, int position) {
-               if (numberOfSubTasksInOutputs.isEmpty()) {
-                       numberOfSubTasksInOutputs.add(-1);
-               }
-               int currSize = numberOfSubTasksInOutputs.size();
-               for (int i = currSize; i <= position; i++) {
-                       numberOfSubTasksInOutputs.add(i, -1);
-               }
-               numberOfSubTasksInOutputs.set(position, subTasks);
-       }
-
-       // Must be called before taskConfig is written to Tez configuration
-       protected void writeInputPositionsToConfig () {
-               HashMap<String,ArrayList<Integer>> toWrite = new 
HashMap<String, ArrayList<Integer>>();
-               for (FlinkVertex v: inputPositions.keySet()) {
-                       String name = v.getUniqueName();
-                       List<Integer> positions = inputPositions.get(v);
-                       toWrite.put(name, new ArrayList<Integer>(positions));
-               }
-               this.taskConfig.setInputPositions(toWrite);
-       }
-
-       // Must be called before taskConfig is written to Tez configuration
-       protected void writeSubTasksInOutputToConfig () {
-               
this.taskConfig.setNumberSubtasksInOutput(this.numberOfSubTasksInOutputs);
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java
deleted file mode 100644
index 52f39be..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java
+++ /dev/null
@@ -1,460 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.dag;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.distributions.DataDistribution;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.InputSplit;
-import org.apache.flink.optimizer.CompilerException;
-import org.apache.flink.optimizer.dag.TempMode;
-import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
-import org.apache.flink.optimizer.plan.Channel;
-import org.apache.flink.optimizer.plan.DualInputPlanNode;
-import org.apache.flink.optimizer.plan.NAryUnionPlanNode;
-import org.apache.flink.optimizer.plan.OptimizedPlan;
-import org.apache.flink.optimizer.plan.PlanNode;
-import org.apache.flink.optimizer.plan.SingleInputPlanNode;
-import org.apache.flink.optimizer.plan.SinkPlanNode;
-import org.apache.flink.optimizer.plan.SourcePlanNode;
-import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.tez.runtime.TezTaskConfig;
-import org.apache.flink.util.Visitor;
-import org.apache.tez.dag.api.DAG;
-import org.apache.tez.dag.api.TezConfiguration;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-
-public class TezDAGGenerator implements Visitor<PlanNode> {
-
-       private static final Log LOG = LogFactory.getLog(TezDAGGenerator.class);
-       
-       private Map<PlanNode, FlinkVertex> vertices; // a map from optimizer 
nodes to Tez vertices
-       private List<FlinkEdge> edges;
-       private final int defaultMaxFan;
-       private final TezConfiguration tezConf;
-
-       private final float defaultSortSpillingThreshold;
-
-       public TezDAGGenerator (TezConfiguration tezConf, Configuration config) 
{
-               this.defaultMaxFan = 
config.getInteger(ConfigConstants.DEFAULT_SPILLING_MAX_FAN_KEY,
-                               ConfigConstants.DEFAULT_SPILLING_MAX_FAN);
-               this.defaultSortSpillingThreshold = 
config.getFloat(ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD_KEY,
-                               
ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD);
-               this.tezConf = tezConf;
-       }
-
-       public DAG createDAG (OptimizedPlan program) throws Exception {
-               LOG.info ("Creating Tez DAG");
-               this.vertices = new HashMap<PlanNode, FlinkVertex>();
-               this.edges = new ArrayList<FlinkEdge>();
-               program.accept(this);
-
-               DAG dag = DAG.create(program.getJobName());
-               for (FlinkVertex v : vertices.values()) {
-                       dag.addVertex(v.createVertex(new 
TezConfiguration(tezConf)));
-               }
-               for (FlinkEdge e: edges) {
-                       dag.addEdge(e.createEdge(new 
TezConfiguration(tezConf)));
-               }
-
-               /*
-                * Temporarily throw an error until TEZ-1190 has been fixed or 
a workaround has been created
-                */
-               if (containsSelfJoins()) {
-                       throw new CompilerException("Dual-input operators with 
the same input (self-joins) are not yet supported");
-               }
-
-               this.vertices = null;
-               this.edges = null;
-
-               LOG.info ("Tez DAG created");
-               return dag;
-       }
-
-
-       @Override
-       public boolean preVisit(PlanNode node) {
-               if (this.vertices.containsKey(node)) {
-                       // return false to prevent further descend
-                       return false;
-               }
-
-               if ((node instanceof BulkIterationPlanNode) || (node instanceof 
WorksetIterationPlanNode)) {
-                       throw new CompilerException("Iterations are not yet 
supported by the Tez execution environment");
-               }
-
-               if ( (node.getBroadcastInputs() != null) && 
(!node.getBroadcastInputs().isEmpty())) {
-                       throw new CompilerException("Broadcast inputs are not 
yet supported by the Tez execution environment");
-               }
-
-               FlinkVertex vertex = null;
-
-               try {
-                       if (node instanceof SourcePlanNode) {
-                               vertex = createDataSourceVertex 
((SourcePlanNode) node);
-                       }
-                       else if (node instanceof SinkPlanNode) {
-                               vertex = createDataSinkVertex ((SinkPlanNode) 
node);
-                       }
-                       else if ((node instanceof SingleInputPlanNode)) {
-                               vertex = 
createSingleInputVertex((SingleInputPlanNode) node);
-                       }
-                       else if (node instanceof DualInputPlanNode) {
-                               vertex = 
createDualInputVertex((DualInputPlanNode) node);
-                       }
-                       else if (node instanceof NAryUnionPlanNode) {
-                               vertex = createUnionVertex ((NAryUnionPlanNode) 
node);
-                       }
-                       else {
-                               throw new CompilerException("Unrecognized node 
type: " + node.getClass().getName());
-                       }
-
-               }
-               catch (Exception e) {
-                       throw new CompilerException("Error translating node '" 
+ node + "': " + e.getMessage(), e);
-               }
-
-               if (vertex != null) {
-                       this.vertices.put(node, vertex);
-               }
-               return true;
-       }
-
-       @Override
-       public void postVisit (PlanNode node) {
-               try {
-                       if (node instanceof SourcePlanNode) {
-                               return;
-                       }
-                       final Iterator<Channel> inConns = 
node.getInputs().iterator();
-                       if (!inConns.hasNext()) {
-                               throw new CompilerException("Bug: Found a 
non-source task with no input.");
-                       }
-                       int inputIndex = 0;
-
-                       FlinkVertex targetVertex = this.vertices.get(node);
-                       TezTaskConfig targetVertexConfig = 
targetVertex.getConfig();
-
-
-                       while (inConns.hasNext()) {
-                               Channel input = inConns.next();
-                               inputIndex += translateChannel(input, 
inputIndex, targetVertex, targetVertexConfig, false);
-                       }
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       throw new CompilerException(
-                                       "An error occurred while translating 
the optimized plan to a Tez DAG: " + e.getMessage(), e);
-               }
-       }
-
-       private FlinkVertex createSingleInputVertex(SingleInputPlanNode node) 
throws CompilerException, IOException {
-
-               final String taskName = node.getNodeName();
-               final DriverStrategy ds = node.getDriverStrategy();
-               final int dop = node.getParallelism();
-
-               final TezTaskConfig config= new TezTaskConfig(new 
Configuration());
-
-               config.setDriver(ds.getDriverClass());
-               config.setDriverStrategy(ds);
-               
config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
-               
config.setStubParameters(node.getProgramOperator().getParameters());
-
-               for(int i=0;i<ds.getNumRequiredComparators();i++) {
-                       config.setDriverComparator(node.getComparator(i), i);
-               }
-               assignDriverResources(node, config);
-
-               return new FlinkProcessorVertex(taskName, dop, config);
-       }
-
-       private FlinkVertex createDualInputVertex(DualInputPlanNode node) 
throws CompilerException, IOException {
-               final String taskName = node.getNodeName();
-               final DriverStrategy ds = node.getDriverStrategy();
-               final int dop = node.getParallelism();
-
-               final TezTaskConfig config= new TezTaskConfig(new 
Configuration());
-
-               config.setDriver(ds.getDriverClass());
-               config.setDriverStrategy(ds);
-               
config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
-               
config.setStubParameters(node.getProgramOperator().getParameters());
-
-               if (node.getComparator1() != null) {
-                       config.setDriverComparator(node.getComparator1(), 0);
-               }
-               if (node.getComparator2() != null) {
-                       config.setDriverComparator(node.getComparator2(), 1);
-               }
-               if (node.getPairComparator() != null) {
-                       
config.setDriverPairComparator(node.getPairComparator());
-               }
-
-               assignDriverResources(node, config);
-
-               LOG.info("Creating processor vertex " + taskName + " with 
parallelism " + dop);
-
-               return new FlinkProcessorVertex(taskName, dop, config);
-       }
-
-       private FlinkVertex createDataSinkVertex(SinkPlanNode node) throws 
CompilerException, IOException {
-               final String taskName = node.getNodeName();
-               final int dop = node.getParallelism();
-
-               final TezTaskConfig config = new TezTaskConfig(new 
Configuration());
-
-               // set user code
-               
config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
-               
config.setStubParameters(node.getProgramOperator().getParameters());
-
-               LOG.info("Creating data sink vertex " + taskName + " with 
parallelism " + dop);
-               
-               return new FlinkDataSinkVertex(taskName, dop, config);
-       }
-
-       private FlinkVertex createDataSourceVertex(SourcePlanNode node) throws 
CompilerException, IOException {
-               final String taskName = node.getNodeName();
-               int dop = node.getParallelism();
-
-               final TezTaskConfig config= new TezTaskConfig(new 
Configuration());
-
-               
config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
-               
config.setStubParameters(node.getProgramOperator().getParameters());
-
-               InputFormat format = 
node.getDataSourceNode().getOperator().getFormatWrapper().getUserCodeObject();
-
-               config.setInputFormat(format);
-
-               // Create as many data sources as input splits
-               InputSplit[] splits = format.createInputSplits((dop > 0) ? dop 
: 1);
-               dop = splits.length;
-
-               LOG.info("Creating data source vertex " + taskName + " with 
parallelism " + dop);
-               
-               return new FlinkDataSourceVertex(taskName, dop, config);
-       }
-
-       private FlinkVertex createUnionVertex(NAryUnionPlanNode node) throws 
CompilerException, IOException {
-               final String taskName = node.getNodeName();
-               final int dop = node.getParallelism();
-               final TezTaskConfig config= new TezTaskConfig(new 
Configuration());
-
-               LOG.info("Creating union vertex " + taskName + " with 
parallelism " + dop);
-               
-               return new FlinkUnionVertex (taskName, dop, config);
-       }
-
-
-       private void assignDriverResources(PlanNode node, TaskConfig config) {
-               final double relativeMem = node.getRelativeMemoryPerSubTask();
-               if (relativeMem > 0) {
-                       config.setRelativeMemoryDriver(relativeMem);
-                       config.setFilehandlesDriver(this.defaultMaxFan);
-                       
config.setSpillingThresholdDriver(this.defaultSortSpillingThreshold);
-               }
-       }
-
-       private void assignLocalStrategyResources(Channel c, TaskConfig config, 
int inputNum) {
-               if (c.getRelativeMemoryLocalStrategy() > 0) {
-                       config.setRelativeMemoryInput(inputNum, 
c.getRelativeMemoryLocalStrategy());
-                       config.setFilehandlesInput(inputNum, 
this.defaultMaxFan);
-                       config.setSpillingThresholdInput(inputNum, 
this.defaultSortSpillingThreshold);
-               }
-       }
-
-       private int translateChannel(Channel input, int inputIndex, FlinkVertex 
targetVertex,
-                                                               TezTaskConfig 
targetVertexConfig, boolean isBroadcast) throws Exception
-       {
-               final PlanNode inputPlanNode = input.getSource();
-               final Iterator<Channel> allInChannels;
-
-
-               allInChannels = Collections.singletonList(input).iterator();
-
-
-               // check that the type serializer is consistent
-               TypeSerializerFactory<?> typeSerFact = null;
-
-               while (allInChannels.hasNext()) {
-                       final Channel inConn = allInChannels.next();
-
-                       if (typeSerFact == null) {
-                               typeSerFact = inConn.getSerializer();
-                       } else if (!typeSerFact.equals(inConn.getSerializer())) 
{
-                               throw new CompilerException("Conflicting types 
in union operator.");
-                       }
-
-                       final PlanNode sourceNode = inConn.getSource();
-                       FlinkVertex sourceVertex = 
this.vertices.get(sourceNode);
-                       TezTaskConfig sourceVertexConfig = 
sourceVertex.getConfig(); //TODO ??? need to create a new TezConfig ???
-
-                       connectJobVertices(
-                                       inConn, inputIndex, sourceVertex, 
sourceVertexConfig, targetVertex, targetVertexConfig, isBroadcast);
-               }
-
-               // the local strategy is added only once. in non-union case 
that is the actual edge,
-               // in the union case, it is the edge between union and the 
target node
-               addLocalInfoFromChannelToConfig(input, targetVertexConfig, 
inputIndex, isBroadcast);
-               return 1;
-       }
-
-       private void connectJobVertices(Channel channel, int inputNumber,
-                                                       final FlinkVertex 
sourceVertex, final TezTaskConfig sourceConfig,
-                                                       final FlinkVertex 
targetVertex, final TezTaskConfig targetConfig, boolean isBroadcast)
-                       throws CompilerException {
-
-               // -------------- configure the source task's ship strategy 
strategies in task config --------------
-               final int outputIndex = sourceConfig.getNumOutputs();
-               sourceConfig.addOutputShipStrategy(channel.getShipStrategy());
-               if (outputIndex == 0) {
-                       
sourceConfig.setOutputSerializer(channel.getSerializer());
-               }
-               if (channel.getShipStrategyComparator() != null) {
-                       
sourceConfig.setOutputComparator(channel.getShipStrategyComparator(), 
outputIndex);
-               }
-
-               if (channel.getShipStrategy() == 
ShipStrategyType.PARTITION_RANGE) {
-
-                       final DataDistribution dataDistribution = 
channel.getDataDistribution();
-                       if(dataDistribution != null) {
-                               
sourceConfig.setOutputDataDistribution(dataDistribution, outputIndex);
-                       } else {
-                               throw new RuntimeException("Range partitioning 
requires data distribution");
-                               // TODO: inject code and configuration for 
automatic histogram generation
-                       }
-               }
-
-               // ---------------- configure the receiver -------------------
-               if (isBroadcast) {
-                       targetConfig.addBroadcastInputToGroup(inputNumber);
-               } else {
-                       targetConfig.addInputToGroup(inputNumber);
-               }
-
-               //----------------- connect source and target with edge 
------------------------------
-
-               FlinkEdge edge;
-               ShipStrategyType shipStrategy = channel.getShipStrategy();
-               TypeSerializer<?> serializer = 
channel.getSerializer().getSerializer();
-               if ((shipStrategy == ShipStrategyType.FORWARD) || (shipStrategy 
== ShipStrategyType.NONE)) {
-                       edge = new FlinkForwardEdge(sourceVertex, targetVertex, 
serializer);
-                       // For forward edges, create as many tasks in upstream 
operator as in source operator
-                       
targetVertex.setParallelism(sourceVertex.getParallelism());
-               }
-               else if (shipStrategy == ShipStrategyType.BROADCAST) {
-                       edge = new FlinkBroadcastEdge(sourceVertex, 
targetVertex, serializer);
-               }
-               else if (shipStrategy == ShipStrategyType.PARTITION_HASH) {
-                       edge = new FlinkPartitionEdge(sourceVertex, 
targetVertex, serializer);
-               }
-               else {
-                       throw new CompilerException("Ship strategy between 
nodes " + sourceVertex.getVertex().getName() + " and " + 
targetVertex.getVertex().getName() + " currently not supported");
-               }
-
-               // Tez-specific bookkeeping
-               // TODO: This probably will not work for vertices with multiple 
outputs
-               
sourceVertex.addNumberOfSubTasksInOutput(targetVertex.getParallelism(), 
outputIndex);
-               targetVertex.addInput(sourceVertex, inputNumber);
-
-
-               edges.add(edge);
-       }
-
-       private void addLocalInfoFromChannelToConfig(Channel channel, 
TaskConfig config, int inputNum, boolean isBroadcastChannel) {
-               // serializer
-               if (isBroadcastChannel) {
-                       
config.setBroadcastInputSerializer(channel.getSerializer(), inputNum);
-
-                       if (channel.getLocalStrategy() != LocalStrategy.NONE || 
(channel.getTempMode() != null && channel.getTempMode() != TempMode.NONE)) {
-                               throw new CompilerException("Found local 
strategy or temp mode on a broadcast variable channel.");
-                       } else {
-                               return;
-                       }
-               } else {
-                       config.setInputSerializer(channel.getSerializer(), 
inputNum);
-               }
-
-               // local strategy
-               if (channel.getLocalStrategy() != LocalStrategy.NONE) {
-                       config.setInputLocalStrategy(inputNum, 
channel.getLocalStrategy());
-                       if (channel.getLocalStrategyComparator() != null) {
-                               
config.setInputComparator(channel.getLocalStrategyComparator(), inputNum);
-                       }
-               }
-
-               assignLocalStrategyResources(channel, config, inputNum);
-
-               // materialization / caching
-               if (channel.getTempMode() != null) {
-                       final TempMode tm = channel.getTempMode();
-
-                       boolean needsMemory = false;
-                       if (tm.breaksPipeline()) {
-                               
config.setInputAsynchronouslyMaterialized(inputNum, true);
-                               needsMemory = true;
-                       }
-                       if (tm.isCached()) {
-                               config.setInputCached(inputNum, true);
-                               needsMemory = true;
-                       }
-
-                       if (needsMemory) {
-                               // sanity check
-                               if (tm == null || tm == TempMode.NONE || 
channel.getRelativeTempMemory() <= 0) {
-                                       throw new CompilerException("Bug in 
compiler: Inconsistent description of input materialization.");
-                               }
-                               
config.setRelativeInputMaterializationMemory(inputNum, 
channel.getRelativeTempMemory());
-                       }
-               }
-       }
-
-       private boolean containsSelfJoins () {
-               for (FlinkVertex v : vertices.values()) {
-                       ArrayList<FlinkVertex> predecessors = new 
ArrayList<FlinkVertex>();
-                       for (FlinkEdge e : edges) {
-                               if (e.target == v) {
-                                       if (predecessors.contains(e.source)) {
-                                               return true;
-                                       }
-                                       predecessors.add(e.source);
-                               }
-                       }
-               }
-               return false;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java
deleted file mode 100644
index 707fd47..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.examples;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
-import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.graph.util.ConnectedComponentsData;
-import org.apache.flink.util.Collector;
-
-
-public class ConnectedComponentsStep implements ProgramDescription {
-
-       // 
*************************************************************************
-       //     PROGRAM
-       // 
*************************************************************************
-
-       public static void main(String... args) throws Exception {
-
-               if(!parseParameters(args)) {
-                       return;
-               }
-
-               // set up execution environment
-               ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               // read vertex and edge data
-               DataSet<Long> vertices = getVertexDataSet(env);
-               DataSet<Tuple2<Long, Long>> edges = 
getEdgeDataSet(env).flatMap(new UndirectEdge());
-
-               // assign the initial components (equal to the vertex id)
-               DataSet<Tuple2<Long, Long>> verticesWithInitialId = 
vertices.map(new DuplicateValue<Long>());
-
-               DataSet<Tuple2<Long,Long>> nextComponenets = 
verticesWithInitialId
-                               .join(edges)
-                               .where(0).equalTo(0)
-                               .with(new NeighborWithComponentIDJoin())
-                               .groupBy(0).aggregate(Aggregations.MIN, 1)
-                               .join(verticesWithInitialId)
-                               .where(0).equalTo(0)
-                               .with(new ComponentIdFilter());
-
-
-               // emit result
-               if(fileOutput) {
-                       nextComponenets.writeAsCsv(outputPath, "\n", " ");
-               } else {
-                       nextComponenets.print();
-               }
-
-               // execute program
-               env.execute("Connected Components Example");
-       }
-
-       // 
*************************************************************************
-       //     USER FUNCTIONS
-       // 
*************************************************************************
-
-       /**
-        * Function that turns a value into a 2-tuple where both fields are 
that value.
-        */
-       @ForwardedFields("*->f0")
-       public static final class DuplicateValue<T> implements MapFunction<T, 
Tuple2<T, T>> {
-
-               @Override
-               public Tuple2<T, T> map(T vertex) {
-                       return new Tuple2<T, T>(vertex, vertex);
-               }
-       }
-
-       /**
-        * Undirected edges by emitting for each input edge the input edges 
itself and an inverted version.
-        */
-       public static final class UndirectEdge implements 
FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
-               Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>();
-
-               @Override
-               public void flatMap(Tuple2<Long, Long> edge, 
Collector<Tuple2<Long, Long>> out) {
-                       invertedEdge.f0 = edge.f1;
-                       invertedEdge.f1 = edge.f0;
-                       out.collect(edge);
-                       out.collect(invertedEdge);
-               }
-       }
-
-       /**
-        * UDF that joins a (Vertex-ID, Component-ID) pair that represents the 
current component that
-        * a vertex is associated with, with a (Source-Vertex-ID, 
Target-VertexID) edge. The function
-        * produces a (Target-vertex-ID, Component-ID) pair.
-        */
-       @ForwardedFieldsFirst("f1->f1")
-       @ForwardedFieldsSecond("f1->f0")
-       public static final class NeighborWithComponentIDJoin implements 
JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-               @Override
-               public Tuple2<Long, Long> join(Tuple2<Long, Long> 
vertexWithComponent, Tuple2<Long, Long> edge) {
-                       return new Tuple2<Long, Long>(edge.f1, 
vertexWithComponent.f1);
-               }
-       }
-
-
-
-       @ForwardedFieldsFirst("*")
-       public static final class ComponentIdFilter implements 
FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> {
-
-               @Override
-               public void join(Tuple2<Long, Long> candidate, Tuple2<Long, 
Long> old, Collector<Tuple2<Long, Long>> out) {
-                       if (candidate.f1 < old.f1) {
-                               out.collect(candidate);
-                       }
-               }
-       }
-
-
-
-       @Override
-       public String getDescription() {
-               return "Parameters: <vertices-path> <edges-path> <result-path> 
<max-number-of-iterations>";
-       }
-
-       // 
*************************************************************************
-       //     UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String verticesPath = null;
-       private static String edgesPath = null;
-       private static String outputPath = null;
-       private static int maxIterations = 10;
-
-       private static boolean parseParameters(String[] programArguments) {
-
-               if(programArguments.length > 0) {
-                       // parse input arguments
-                       fileOutput = true;
-                       if(programArguments.length == 4) {
-                               verticesPath = programArguments[0];
-                               edgesPath = programArguments[1];
-                               outputPath = programArguments[2];
-                               maxIterations = 
Integer.parseInt(programArguments[3]);
-                       } else {
-                               System.err.println("Usage: ConnectedComponents 
<vertices path> <edges path> <result path> <max number of iterations>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing Connected Components 
example with default parameters and built-in default data.");
-                       System.out.println("  Provide parameters to read input 
data from files.");
-                       System.out.println("  See the documentation for the 
correct format of input files.");
-                       System.out.println("  Usage: ConnectedComponents 
<vertices path> <edges path> <result path> <max number of iterations>");
-               }
-               return true;
-       }
-
-       private static DataSet<Long> getVertexDataSet(ExecutionEnvironment env) 
{
-
-               if(fileOutput) {
-                       return env.readCsvFile(verticesPath).types(Long.class)
-                                       .map(
-                                                       new 
MapFunction<Tuple1<Long>, Long>() {
-                                                               public Long 
map(Tuple1<Long> value) { return value.f0; }
-                                                       });
-               } else {
-                       return 
ConnectedComponentsData.getDefaultVertexDataSet(env);
-               }
-       }
-
-       private static DataSet<Tuple2<Long, Long>> 
getEdgeDataSet(ExecutionEnvironment env) {
-
-               if(fileOutput) {
-                       return env.readCsvFile(edgesPath).fieldDelimiter(' 
').types(Long.class, Long.class);
-               } else {
-                       return 
ConnectedComponentsData.getDefaultEdgeDataSet(env);
-               }
-       }
-
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java
deleted file mode 100644
index c65fb69..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.examples;
-
-import org.apache.hadoop.util.ProgramDriver;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.dag.api.client.DAGClient;
-import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.dag.api.client.Progress;
-import org.apache.tez.dag.api.client.StatusGetOpts;
-import org.apache.tez.dag.api.client.VertexStatus;
-
-import java.io.IOException;
-import java.text.DecimalFormat;
-import java.util.EnumSet;
-import java.util.Set;
-
-public class ExampleDriver {
-
-       private static final DecimalFormat formatter = new 
DecimalFormat("###.##%");
-
-       public static void main(String [] args){
-               int exitCode = -1;
-               ProgramDriver pgd = new ProgramDriver();
-               try {
-                       pgd.addClass("wc", WordCount.class,
-                                       "Wordcount");
-                       pgd.addClass("tpch3", TPCHQuery3.class,
-                                       "Modified TPC-H 3 query");
-                       pgd.addClass("tc", TransitiveClosureNaiveStep.class,
-                                       "One step of transitive closure");
-                       pgd.addClass("pr", PageRankBasicStep.class,
-                                       "One step of PageRank");
-                       pgd.addClass("cc", ConnectedComponentsStep.class,
-                                       "One step of connected components");
-                       exitCode = pgd.run(args);
-               } catch(Throwable e){
-                       e.printStackTrace();
-               }
-               System.exit(exitCode);
-       }
-
-       public static void printDAGStatus(DAGClient dagClient, String[] 
vertexNames)
-                       throws IOException, TezException {
-               printDAGStatus(dagClient, vertexNames, false, false);
-       }
-
-       public static void printDAGStatus(DAGClient dagClient, String[] 
vertexNames, boolean displayDAGCounters, boolean displayVertexCounters)
-                       throws IOException, TezException {
-               Set<StatusGetOpts> opts = 
EnumSet.of(StatusGetOpts.GET_COUNTERS);
-               DAGStatus dagStatus = dagClient.getDAGStatus(
-                               (displayDAGCounters ? opts : null));
-               Progress progress = dagStatus.getDAGProgress();
-               double vProgressFloat = 0.0f;
-               if (progress != null) {
-                       System.out.println("");
-                       System.out.println("DAG: State: "
-                                       + dagStatus.getState()
-                                       + " Progress: "
-                                       + (progress.getTotalTaskCount() < 0 ? 
formatter.format(0.0f) :
-                                       
formatter.format((double)(progress.getSucceededTaskCount())
-                                                       
/progress.getTotalTaskCount())));
-                       for (String vertexName : vertexNames) {
-                               VertexStatus vStatus = 
dagClient.getVertexStatus(vertexName,
-                                               (displayVertexCounters ? opts : 
null));
-                               if (vStatus == null) {
-                                       System.out.println("Could not retrieve 
status for vertex: "
-                                                       + vertexName);
-                                       continue;
-                               }
-                               Progress vProgress = vStatus.getProgress();
-                               if (vProgress != null) {
-                                       vProgressFloat = 0.0f;
-                                       if (vProgress.getTotalTaskCount() == 0) 
{
-                                               vProgressFloat = 1.0f;
-                                       } else if 
(vProgress.getTotalTaskCount() > 0) {
-                                               vProgressFloat = 
(double)vProgress.getSucceededTaskCount()
-                                                               
/vProgress.getTotalTaskCount();
-                                       }
-                                       System.out.println("VertexStatus:"
-                                                       + " VertexName: "
-                                                       + 
(vertexName.equals("ivertex1") ? "intermediate-reducer"
-                                                       : vertexName)
-                                                       + " Progress: " + 
formatter.format(vProgressFloat));
-                               }
-                               if (displayVertexCounters) {
-                                       TezCounters counters = 
vStatus.getVertexCounters();
-                                       if (counters != null) {
-                                               System.out.println("Vertex 
Counters for " + vertexName + ": "
-                                                               + counters);
-                                       }
-                               }
-                       }
-               }
-               if (displayDAGCounters) {
-                       TezCounters counters = dagStatus.getDAGCounters();
-                       if (counters != null) {
-                               System.out.println("DAG Counters: " + counters);
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java
deleted file mode 100644
index 031893d..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.examples;
-
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.examples.java.graph.util.PageRankData;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-
-import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
-
-public class PageRankBasicStep {
-
-       private static final double DAMPENING_FACTOR = 0.85;
-       private static final double EPSILON = 0.0001;
-
-       // 
*************************************************************************
-       //     PROGRAM
-       // 
*************************************************************************
-
-       public static void main(String[] args) throws Exception {
-
-               if(!parseParameters(args)) {
-                       return;
-               }
-
-               // set up execution environment
-               final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
-
-               // get input data
-               DataSet<Long> pagesInput = getPagesDataSet(env);
-               DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env);
-
-               // assign initial rank to pages
-               DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
-                               map(new RankAssigner((1.0d / numPages)));
-
-               // build adjacency list from link input
-               DataSet<Tuple2<Long, Long[]>> adjacencyListInput =
-                               linksInput.groupBy(0).reduceGroup(new 
BuildOutgoingEdgeList());
-
-               DataSet<Tuple2<Long, Double>> newRanks = pagesWithRanks
-                               .join(adjacencyListInput).where(0).equalTo(0)
-                               .flatMap(new JoinVertexWithEdgesMatch())
-                               .groupBy(0).aggregate(SUM, 1)
-                               .map(new Dampener(DAMPENING_FACTOR, numPages));
-
-
-               // emit result
-               if(fileOutput) {
-                       newRanks.writeAsCsv(outputPath, "\n", " ");
-               } else {
-                       newRanks.print();
-               }
-
-               // execute program
-               env.execute("Basic Page Rank Example");
-
-       }
-
-       // 
*************************************************************************
-       //     USER FUNCTIONS
-       // 
*************************************************************************
-
-       /**
-        * A map function that assigns an initial rank to all pages.
-        */
-       public static final class RankAssigner implements MapFunction<Long, 
Tuple2<Long, Double>> {
-               Tuple2<Long, Double> outPageWithRank;
-
-               public RankAssigner(double rank) {
-                       this.outPageWithRank = new Tuple2<Long, Double>(-1l, 
rank);
-               }
-
-               @Override
-               public Tuple2<Long, Double> map(Long page) {
-                       outPageWithRank.f0 = page;
-                       return outPageWithRank;
-               }
-       }
-
-       /**
-        * A reduce function that takes a sequence of edges and builds the 
adjacency list for the vertex where the edges
-        * originate. Run as a pre-processing step.
-        */
-       @ForwardedFields("0")
-       public static final class BuildOutgoingEdgeList implements 
GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {
-
-               private final ArrayList<Long> neighbors = new ArrayList<Long>();
-
-               @Override
-               public void reduce(Iterable<Tuple2<Long, Long>> values, 
Collector<Tuple2<Long, Long[]>> out) {
-                       neighbors.clear();
-                       Long id = 0L;
-
-                       for (Tuple2<Long, Long> n : values) {
-                               id = n.f0;
-                               neighbors.add(n.f1);
-                       }
-                       out.collect(new Tuple2<Long, Long[]>(id, 
neighbors.toArray(new Long[neighbors.size()])));
-               }
-       }
-
-       /**
-        * Join function that distributes a fraction of a vertex's rank to all 
neighbors.
-        */
-       public static final class JoinVertexWithEdgesMatch implements 
FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, 
Tuple2<Long, Double>> {
-
-               @Override
-               public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, 
Long[]>> value, Collector<Tuple2<Long, Double>> out){
-                       Long[] neigbors = value.f1.f1;
-                       double rank = value.f0.f1;
-                       double rankToDistribute = rank / ((double) 
neigbors.length);
-
-                       for (int i = 0; i < neigbors.length; i++) {
-                               out.collect(new Tuple2<Long, 
Double>(neigbors[i], rankToDistribute));
-                       }
-               }
-       }
-
-       /**
-        * The function that applies the page rank dampening formula
-        */
-       @ForwardedFields("0")
-       public static final class Dampener implements 
MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> {
-
-               private final double dampening;
-               private final double randomJump;
-
-               public Dampener(double dampening, double numVertices) {
-                       this.dampening = dampening;
-                       this.randomJump = (1 - dampening) / numVertices;
-               }
-
-               @Override
-               public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
-                       value.f1 = (value.f1 * dampening) + randomJump;
-                       return value;
-               }
-       }
-
-       /**
-        * Filter that filters vertices where the rank difference is below a 
threshold.
-        */
-       public static final class EpsilonFilter implements 
FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {
-
-               @Override
-               public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, 
Double>> value) {
-                       return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
-               }
-       }
-
-       // 
*************************************************************************
-       //     UTIL METHODS
-       // 
*************************************************************************
-
-       private static boolean fileOutput = false;
-       private static String pagesInputPath = null;
-       private static String linksInputPath = null;
-       private static String outputPath = null;
-       private static long numPages = 0;
-       private static int maxIterations = 10;
-
-       private static boolean parseParameters(String[] args) {
-
-               if(args.length > 0) {
-                       if(args.length == 5) {
-                               fileOutput = true;
-                               pagesInputPath = args[0];
-                               linksInputPath = args[1];
-                               outputPath = args[2];
-                               numPages = Integer.parseInt(args[3]);
-                               maxIterations = Integer.parseInt(args[4]);
-                       } else {
-                               System.err.println("Usage: PageRankBasic <pages 
path> <links path> <output path> <num pages> <num iterations>");
-                               return false;
-                       }
-               } else {
-                       System.out.println("Executing PageRank Basic example 
with default parameters and built-in default data.");
-                       System.out.println("  Provide parameters to read input 
data from files.");
-                       System.out.println("  See the documentation for the 
correct format of input files.");
-                       System.out.println("  Usage: PageRankBasic <pages path> 
<links path> <output path> <num pages> <num iterations>");
-
-                       numPages = PageRankData.getNumberOfPages();
-               }
-               return true;
-       }
-
-       private static DataSet<Long> getPagesDataSet(ExecutionEnvironment env) {
-               if(fileOutput) {
-                       return env
-                                       .readCsvFile(pagesInputPath)
-                                       .fieldDelimiter(' ')
-                                       .lineDelimiter("\n")
-                                       .types(Long.class)
-                                       .map(new MapFunction<Tuple1<Long>, 
Long>() {
-                                               @Override
-                                               public Long map(Tuple1<Long> v) 
{ return v.f0; }
-                                       });
-               } else {
-                       return PageRankData.getDefaultPagesDataSet(env);
-               }
-       }
-
-       private static DataSet<Tuple2<Long, Long>> 
getLinksDataSet(ExecutionEnvironment env) {
-               if(fileOutput) {
-                       return env.readCsvFile(linksInputPath)
-                                       .fieldDelimiter(' ')
-                                       .lineDelimiter("\n")
-                                       .types(Long.class, Long.class);
-               } else {
-                       return PageRankData.getDefaultEdgeDataSet(env);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
 
b/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
deleted file mode 100644
index d61f80e..0000000
--- 
a/flink-staging/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tez.examples;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.tez.client.RemoteTezEnvironment;
-
-import java.text.DateFormat;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-public class TPCHQuery3 {
-
-       // 
*************************************************************************
-       //     PROGRAM
-       // 
*************************************************************************
-
-       public static void main(String[] args) throws Exception {
-
-               if(!parseParameters(args)) {
-                       return;
-               }
-
-               final RemoteTezEnvironment env = RemoteTezEnvironment.create();
-               env.setParallelism(400);
-
-
-               // get input data
-               DataSet<Lineitem> lineitems = getLineitemDataSet(env);
-               DataSet<Order> orders = getOrdersDataSet(env);
-               DataSet<Customer> customers = getCustomerDataSet(env);
-
-               // Filter market segment "AUTOMOBILE"
-               customers = customers.filter(
-                               new FilterFunction<Customer>() {
-                                       @Override
-                                       public boolean filter(Customer c) {
-                                               return 
c.getMktsegment().equals("AUTOMOBILE");
-                                       }
-                               });
-
-               // Filter all Orders with o_orderdate < 12.03.1995
-               orders = orders.filter(
-                               new FilterFunction<Order>() {
-                                       private final DateFormat format = new 
SimpleDateFormat("yyyy-MM-dd");
-                                       private final Date date = 
format.parse("1995-03-12");
-
-                                       @Override
-                                       public boolean filter(Order o) throws 
ParseException {
-                                               return 
format.parse(o.getOrderdate()).before(date);
-                                       }
-                               });
-
-               // Filter all Lineitems with l_shipdate > 12.03.1995
-               lineitems = lineitems.filter(
-                               new FilterFunction<Lineitem>() {
-                                       private final DateFormat format = new 
SimpleDateFormat("yyyy-MM-dd");
-                                       private final Date date = 
format.parse("1995-03-12");
-
-                                       @Override
-                                       public boolean filter(Lineitem l) 
throws ParseException {
-                                               return 
format.parse(l.getShipdate()).after(date);
-                                       }
-                               });
-
-               // Join customers with orders and package them into a 
ShippingPriorityItem
-               DataSet<ShippingPriorityItem> customerWithOrders =
-                               customers.join(orders).where(0).equalTo(1)
-                                               .with(
-                                                               new 
JoinFunction<Customer, Order, ShippingPriorityItem>() {
-                                                                       
@Override
-                                                                       public 
ShippingPriorityItem join(Customer c, Order o) {
-                                                                               
return new ShippingPriorityItem(o.getOrderKey(), 0.0, o.getOrderdate(),
-                                                                               
                o.getShippriority());
-                                                                       }
-                                                               });
-
-               // Join the last join result with Lineitems
-               DataSet<ShippingPriorityItem> result =
-                               
customerWithOrders.join(lineitems).where(0).equalTo(0)
-                                               .with(
-                                                               new 
JoinFunction<ShippingPriorityItem, Lineitem, ShippingPriorityItem>() {
-                                                                       
@Override
-                                                                       public 
ShippingPriorityItem join(ShippingPriorityItem i, Lineitem l) {
-                                                                               
i.setRevenue(l.getExtendedprice() * (1 - l.getDiscount()));
-                                                                               
return i;
-                                                                       }
-                                                               })
-                                                               // Group by 
l_orderkey, o_orderdate and o_shippriority and compute revenue sum
-                                               .groupBy(0, 2, 3)
-                                               .aggregate(Aggregations.SUM, 1);
-
-               // emit result
-               result.writeAsCsv(outputPath, "\n", "|");
-
-               // execute program
-               env.registerMainClass(TPCHQuery3.class);
-               env.execute("TPCH Query 3 Example");
-
-       }
-
-       // 
*************************************************************************
-       //     DATA TYPES
-       // 
*************************************************************************
-
-       public static class Lineitem extends Tuple4<Integer, Double, Double, 
String> {
-
-               public Integer getOrderkey() { return this.f0; }
-               public Double getDiscount() { return this.f2; }
-               public Double getExtendedprice() { return this.f1; }
-               public String getShipdate() { return this.f3; }
-       }
-
-       public static class Customer extends Tuple2<Integer, String> {
-
-               public Integer getCustKey() { return this.f0; }
-               public String getMktsegment() { return this.f1; }
-       }
-
-       public static class Order extends Tuple4<Integer, Integer, String, 
Integer> {
-
-               public Integer getOrderKey() { return this.f0; }
-               public Integer getCustKey() { return this.f1; }
-               public String getOrderdate() { return this.f2; }
-               public Integer getShippriority() { return this.f3; }
-       }
-
-       public static class ShippingPriorityItem extends Tuple4<Integer, 
Double, String, Integer> {
-
-               public ShippingPriorityItem() { }
-
-               public ShippingPriorityItem(Integer o_orderkey, Double revenue,
-                                                                       String 
o_orderdate, Integer o_shippriority) {
-                       this.f0 = o_orderkey;
-                       this.f1 = revenue;
-                       this.f2 = o_orderdate;
-                       this.f3 = o_shippriority;
-               }
-
-               public Integer getOrderkey() { return this.f0; }
-               public void setOrderkey(Integer orderkey) { this.f0 = orderkey; 
}
-               public Double getRevenue() { return this.f1; }
-               public void setRevenue(Double revenue) { this.f1 = revenue; }
-
-               public String getOrderdate() { return this.f2; }
-               public Integer getShippriority() { return this.f3; }
-       }
-
-       // 
*************************************************************************
-       //     UTIL METHODS
-       // 
*************************************************************************
-
-       private static String lineitemPath;
-       private static String customerPath;
-       private static String ordersPath;
-       private static String outputPath;
-
-       private static boolean parseParameters(String[] programArguments) {
-
-               if(programArguments.length > 0) {
-                       if(programArguments.length == 4) {
-                               lineitemPath = programArguments[0];
-                               customerPath = programArguments[1];
-                               ordersPath = programArguments[2];
-                               outputPath = programArguments[3];
-                       } else {
-                               System.err.println("Usage: TPCHQuery3 
<lineitem-csv path> <customer-csv path> <orders-csv path> <result path>");
-                               return false;
-                       }
-               } else {
-                       System.err.println("This program expects data from the 
TPC-H benchmark as input data.\n" +
-                                       "  Due to legal restrictions, we can 
not ship generated data.\n" +
-                                       "  You can find the TPC-H data 
generator at http://www.tpc.org/tpch/.\n"; +
-                                       "  Usage: TPCHQuery3 <lineitem-csv 
path> <customer-csv path> <orders-csv path> <result path>");
-                       return false;
-               }
-               return true;
-       }
-
-       private static DataSet<Lineitem> 
getLineitemDataSet(ExecutionEnvironment env) {
-               return env.readCsvFile(lineitemPath)
-                               .fieldDelimiter('|')
-                               .includeFields("1000011000100000")
-                               .tupleType(Lineitem.class);
-       }
-
-       private static DataSet<Customer> 
getCustomerDataSet(ExecutionEnvironment env) {
-               return env.readCsvFile(customerPath)
-                               .fieldDelimiter('|')
-                               .includeFields("10000010")
-                               .tupleType(Customer.class);
-       }
-
-       private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment 
env) {
-               return env.readCsvFile(ordersPath)
-                               .fieldDelimiter('|')
-                               .includeFields("110010010")
-                               .tupleType(Order.class);
-       }
-
-}

Reply via email to