http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java deleted file mode 100644 index 52f39be..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/dag/TezDAGGenerator.java +++ /dev/null @@ -1,460 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.dag; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.flink.api.common.distributions.DataDistribution; -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.optimizer.CompilerException; -import org.apache.flink.optimizer.dag.TempMode; -import org.apache.flink.optimizer.plan.BulkIterationPlanNode; -import org.apache.flink.optimizer.plan.Channel; -import org.apache.flink.optimizer.plan.DualInputPlanNode; -import org.apache.flink.optimizer.plan.NAryUnionPlanNode; -import org.apache.flink.optimizer.plan.OptimizedPlan; -import org.apache.flink.optimizer.plan.PlanNode; -import org.apache.flink.optimizer.plan.SingleInputPlanNode; -import org.apache.flink.optimizer.plan.SinkPlanNode; -import org.apache.flink.optimizer.plan.SourcePlanNode; -import org.apache.flink.optimizer.plan.WorksetIterationPlanNode; -import org.apache.flink.runtime.operators.DriverStrategy; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.runtime.operators.util.LocalStrategy; -import org.apache.flink.runtime.operators.util.TaskConfig; -import org.apache.flink.tez.runtime.TezTaskConfig; -import org.apache.flink.util.Visitor; -import org.apache.tez.dag.api.DAG; -import org.apache.tez.dag.api.TezConfiguration; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - - -public class TezDAGGenerator implements Visitor<PlanNode> { - - private static final Log LOG = LogFactory.getLog(TezDAGGenerator.class); - - private Map<PlanNode, FlinkVertex> vertices; // a map from optimizer nodes to Tez vertices - private List<FlinkEdge> edges; - private final int defaultMaxFan; - private final TezConfiguration tezConf; - - private final float defaultSortSpillingThreshold; - - public TezDAGGenerator (TezConfiguration tezConf, Configuration config) { - this.defaultMaxFan = config.getInteger(ConfigConstants.DEFAULT_SPILLING_MAX_FAN_KEY, - ConfigConstants.DEFAULT_SPILLING_MAX_FAN); - this.defaultSortSpillingThreshold = config.getFloat(ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD_KEY, - ConfigConstants.DEFAULT_SORT_SPILLING_THRESHOLD); - this.tezConf = tezConf; - } - - public DAG createDAG (OptimizedPlan program) throws Exception { - LOG.info ("Creating Tez DAG"); - this.vertices = new HashMap<PlanNode, FlinkVertex>(); - this.edges = new ArrayList<FlinkEdge>(); - program.accept(this); - - DAG dag = DAG.create(program.getJobName()); - for (FlinkVertex v : vertices.values()) { - dag.addVertex(v.createVertex(new TezConfiguration(tezConf))); - } - for (FlinkEdge e: edges) { - dag.addEdge(e.createEdge(new TezConfiguration(tezConf))); - } - - /* - * Temporarily throw an error until TEZ-1190 has been fixed or a workaround has been created - */ - if (containsSelfJoins()) { - throw new CompilerException("Dual-input operators with the same input (self-joins) are not yet supported"); - } - - this.vertices = null; - this.edges = null; - - LOG.info ("Tez DAG created"); - return dag; - } - - - @Override - public boolean preVisit(PlanNode node) { - if (this.vertices.containsKey(node)) { - // return false to prevent further descend - return false; - } - - if ((node instanceof BulkIterationPlanNode) || (node instanceof WorksetIterationPlanNode)) { - throw new CompilerException("Iterations are not yet supported by the Tez execution environment"); - } - - if ( (node.getBroadcastInputs() != null) && (!node.getBroadcastInputs().isEmpty())) { - throw new CompilerException("Broadcast inputs are not yet supported by the Tez execution environment"); - } - - FlinkVertex vertex = null; - - try { - if (node instanceof SourcePlanNode) { - vertex = createDataSourceVertex ((SourcePlanNode) node); - } - else if (node instanceof SinkPlanNode) { - vertex = createDataSinkVertex ((SinkPlanNode) node); - } - else if ((node instanceof SingleInputPlanNode)) { - vertex = createSingleInputVertex((SingleInputPlanNode) node); - } - else if (node instanceof DualInputPlanNode) { - vertex = createDualInputVertex((DualInputPlanNode) node); - } - else if (node instanceof NAryUnionPlanNode) { - vertex = createUnionVertex ((NAryUnionPlanNode) node); - } - else { - throw new CompilerException("Unrecognized node type: " + node.getClass().getName()); - } - - } - catch (Exception e) { - throw new CompilerException("Error translating node '" + node + "': " + e.getMessage(), e); - } - - if (vertex != null) { - this.vertices.put(node, vertex); - } - return true; - } - - @Override - public void postVisit (PlanNode node) { - try { - if (node instanceof SourcePlanNode) { - return; - } - final Iterator<Channel> inConns = node.getInputs().iterator(); - if (!inConns.hasNext()) { - throw new CompilerException("Bug: Found a non-source task with no input."); - } - int inputIndex = 0; - - FlinkVertex targetVertex = this.vertices.get(node); - TezTaskConfig targetVertexConfig = targetVertex.getConfig(); - - - while (inConns.hasNext()) { - Channel input = inConns.next(); - inputIndex += translateChannel(input, inputIndex, targetVertex, targetVertexConfig, false); - } - } - catch (Exception e) { - e.printStackTrace(); - throw new CompilerException( - "An error occurred while translating the optimized plan to a Tez DAG: " + e.getMessage(), e); - } - } - - private FlinkVertex createSingleInputVertex(SingleInputPlanNode node) throws CompilerException, IOException { - - final String taskName = node.getNodeName(); - final DriverStrategy ds = node.getDriverStrategy(); - final int dop = node.getParallelism(); - - final TezTaskConfig config= new TezTaskConfig(new Configuration()); - - config.setDriver(ds.getDriverClass()); - config.setDriverStrategy(ds); - config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper()); - config.setStubParameters(node.getProgramOperator().getParameters()); - - for(int i=0;i<ds.getNumRequiredComparators();i++) { - config.setDriverComparator(node.getComparator(i), i); - } - assignDriverResources(node, config); - - return new FlinkProcessorVertex(taskName, dop, config); - } - - private FlinkVertex createDualInputVertex(DualInputPlanNode node) throws CompilerException, IOException { - final String taskName = node.getNodeName(); - final DriverStrategy ds = node.getDriverStrategy(); - final int dop = node.getParallelism(); - - final TezTaskConfig config= new TezTaskConfig(new Configuration()); - - config.setDriver(ds.getDriverClass()); - config.setDriverStrategy(ds); - config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper()); - config.setStubParameters(node.getProgramOperator().getParameters()); - - if (node.getComparator1() != null) { - config.setDriverComparator(node.getComparator1(), 0); - } - if (node.getComparator2() != null) { - config.setDriverComparator(node.getComparator2(), 1); - } - if (node.getPairComparator() != null) { - config.setDriverPairComparator(node.getPairComparator()); - } - - assignDriverResources(node, config); - - LOG.info("Creating processor vertex " + taskName + " with parallelism " + dop); - - return new FlinkProcessorVertex(taskName, dop, config); - } - - private FlinkVertex createDataSinkVertex(SinkPlanNode node) throws CompilerException, IOException { - final String taskName = node.getNodeName(); - final int dop = node.getParallelism(); - - final TezTaskConfig config = new TezTaskConfig(new Configuration()); - - // set user code - config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper()); - config.setStubParameters(node.getProgramOperator().getParameters()); - - LOG.info("Creating data sink vertex " + taskName + " with parallelism " + dop); - - return new FlinkDataSinkVertex(taskName, dop, config); - } - - private FlinkVertex createDataSourceVertex(SourcePlanNode node) throws CompilerException, IOException { - final String taskName = node.getNodeName(); - int dop = node.getParallelism(); - - final TezTaskConfig config= new TezTaskConfig(new Configuration()); - - config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper()); - config.setStubParameters(node.getProgramOperator().getParameters()); - - InputFormat format = node.getDataSourceNode().getOperator().getFormatWrapper().getUserCodeObject(); - - config.setInputFormat(format); - - // Create as many data sources as input splits - InputSplit[] splits = format.createInputSplits((dop > 0) ? dop : 1); - dop = splits.length; - - LOG.info("Creating data source vertex " + taskName + " with parallelism " + dop); - - return new FlinkDataSourceVertex(taskName, dop, config); - } - - private FlinkVertex createUnionVertex(NAryUnionPlanNode node) throws CompilerException, IOException { - final String taskName = node.getNodeName(); - final int dop = node.getParallelism(); - final TezTaskConfig config= new TezTaskConfig(new Configuration()); - - LOG.info("Creating union vertex " + taskName + " with parallelism " + dop); - - return new FlinkUnionVertex (taskName, dop, config); - } - - - private void assignDriverResources(PlanNode node, TaskConfig config) { - final double relativeMem = node.getRelativeMemoryPerSubTask(); - if (relativeMem > 0) { - config.setRelativeMemoryDriver(relativeMem); - config.setFilehandlesDriver(this.defaultMaxFan); - config.setSpillingThresholdDriver(this.defaultSortSpillingThreshold); - } - } - - private void assignLocalStrategyResources(Channel c, TaskConfig config, int inputNum) { - if (c.getRelativeMemoryLocalStrategy() > 0) { - config.setRelativeMemoryInput(inputNum, c.getRelativeMemoryLocalStrategy()); - config.setFilehandlesInput(inputNum, this.defaultMaxFan); - config.setSpillingThresholdInput(inputNum, this.defaultSortSpillingThreshold); - } - } - - private int translateChannel(Channel input, int inputIndex, FlinkVertex targetVertex, - TezTaskConfig targetVertexConfig, boolean isBroadcast) throws Exception - { - final PlanNode inputPlanNode = input.getSource(); - final Iterator<Channel> allInChannels; - - - allInChannels = Collections.singletonList(input).iterator(); - - - // check that the type serializer is consistent - TypeSerializerFactory<?> typeSerFact = null; - - while (allInChannels.hasNext()) { - final Channel inConn = allInChannels.next(); - - if (typeSerFact == null) { - typeSerFact = inConn.getSerializer(); - } else if (!typeSerFact.equals(inConn.getSerializer())) { - throw new CompilerException("Conflicting types in union operator."); - } - - final PlanNode sourceNode = inConn.getSource(); - FlinkVertex sourceVertex = this.vertices.get(sourceNode); - TezTaskConfig sourceVertexConfig = sourceVertex.getConfig(); //TODO ??? need to create a new TezConfig ??? - - connectJobVertices( - inConn, inputIndex, sourceVertex, sourceVertexConfig, targetVertex, targetVertexConfig, isBroadcast); - } - - // the local strategy is added only once. in non-union case that is the actual edge, - // in the union case, it is the edge between union and the target node - addLocalInfoFromChannelToConfig(input, targetVertexConfig, inputIndex, isBroadcast); - return 1; - } - - private void connectJobVertices(Channel channel, int inputNumber, - final FlinkVertex sourceVertex, final TezTaskConfig sourceConfig, - final FlinkVertex targetVertex, final TezTaskConfig targetConfig, boolean isBroadcast) - throws CompilerException { - - // -------------- configure the source task's ship strategy strategies in task config -------------- - final int outputIndex = sourceConfig.getNumOutputs(); - sourceConfig.addOutputShipStrategy(channel.getShipStrategy()); - if (outputIndex == 0) { - sourceConfig.setOutputSerializer(channel.getSerializer()); - } - if (channel.getShipStrategyComparator() != null) { - sourceConfig.setOutputComparator(channel.getShipStrategyComparator(), outputIndex); - } - - if (channel.getShipStrategy() == ShipStrategyType.PARTITION_RANGE) { - - final DataDistribution dataDistribution = channel.getDataDistribution(); - if(dataDistribution != null) { - sourceConfig.setOutputDataDistribution(dataDistribution, outputIndex); - } else { - throw new RuntimeException("Range partitioning requires data distribution"); - // TODO: inject code and configuration for automatic histogram generation - } - } - - // ---------------- configure the receiver ------------------- - if (isBroadcast) { - targetConfig.addBroadcastInputToGroup(inputNumber); - } else { - targetConfig.addInputToGroup(inputNumber); - } - - //----------------- connect source and target with edge ------------------------------ - - FlinkEdge edge; - ShipStrategyType shipStrategy = channel.getShipStrategy(); - TypeSerializer<?> serializer = channel.getSerializer().getSerializer(); - if ((shipStrategy == ShipStrategyType.FORWARD) || (shipStrategy == ShipStrategyType.NONE)) { - edge = new FlinkForwardEdge(sourceVertex, targetVertex, serializer); - // For forward edges, create as many tasks in upstream operator as in source operator - targetVertex.setParallelism(sourceVertex.getParallelism()); - } - else if (shipStrategy == ShipStrategyType.BROADCAST) { - edge = new FlinkBroadcastEdge(sourceVertex, targetVertex, serializer); - } - else if (shipStrategy == ShipStrategyType.PARTITION_HASH) { - edge = new FlinkPartitionEdge(sourceVertex, targetVertex, serializer); - } - else { - throw new CompilerException("Ship strategy between nodes " + sourceVertex.getVertex().getName() + " and " + targetVertex.getVertex().getName() + " currently not supported"); - } - - // Tez-specific bookkeeping - // TODO: This probably will not work for vertices with multiple outputs - sourceVertex.addNumberOfSubTasksInOutput(targetVertex.getParallelism(), outputIndex); - targetVertex.addInput(sourceVertex, inputNumber); - - - edges.add(edge); - } - - private void addLocalInfoFromChannelToConfig(Channel channel, TaskConfig config, int inputNum, boolean isBroadcastChannel) { - // serializer - if (isBroadcastChannel) { - config.setBroadcastInputSerializer(channel.getSerializer(), inputNum); - - if (channel.getLocalStrategy() != LocalStrategy.NONE || (channel.getTempMode() != null && channel.getTempMode() != TempMode.NONE)) { - throw new CompilerException("Found local strategy or temp mode on a broadcast variable channel."); - } else { - return; - } - } else { - config.setInputSerializer(channel.getSerializer(), inputNum); - } - - // local strategy - if (channel.getLocalStrategy() != LocalStrategy.NONE) { - config.setInputLocalStrategy(inputNum, channel.getLocalStrategy()); - if (channel.getLocalStrategyComparator() != null) { - config.setInputComparator(channel.getLocalStrategyComparator(), inputNum); - } - } - - assignLocalStrategyResources(channel, config, inputNum); - - // materialization / caching - if (channel.getTempMode() != null) { - final TempMode tm = channel.getTempMode(); - - boolean needsMemory = false; - if (tm.breaksPipeline()) { - config.setInputAsynchronouslyMaterialized(inputNum, true); - needsMemory = true; - } - if (tm.isCached()) { - config.setInputCached(inputNum, true); - needsMemory = true; - } - - if (needsMemory) { - // sanity check - if (tm == null || tm == TempMode.NONE || channel.getRelativeTempMemory() <= 0) { - throw new CompilerException("Bug in compiler: Inconsistent description of input materialization."); - } - config.setRelativeInputMaterializationMemory(inputNum, channel.getRelativeTempMemory()); - } - } - } - - private boolean containsSelfJoins () { - for (FlinkVertex v : vertices.values()) { - ArrayList<FlinkVertex> predecessors = new ArrayList<FlinkVertex>(); - for (FlinkEdge e : edges) { - if (e.target == v) { - if (predecessors.contains(e.source)) { - return true; - } - predecessors.add(e.source); - } - } - } - return false; - } - -}
http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java deleted file mode 100644 index 707fd47..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/ConnectedComponentsStep.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.examples; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; -import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; -import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.examples.java.graph.util.ConnectedComponentsData; -import org.apache.flink.util.Collector; - - -public class ConnectedComponentsStep implements ProgramDescription { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String... args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - // set up execution environment - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // read vertex and edge data - DataSet<Long> vertices = getVertexDataSet(env); - DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env).flatMap(new UndirectEdge()); - - // assign the initial components (equal to the vertex id) - DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>()); - - DataSet<Tuple2<Long,Long>> nextComponenets = verticesWithInitialId - .join(edges) - .where(0).equalTo(0) - .with(new NeighborWithComponentIDJoin()) - .groupBy(0).aggregate(Aggregations.MIN, 1) - .join(verticesWithInitialId) - .where(0).equalTo(0) - .with(new ComponentIdFilter()); - - - // emit result - if(fileOutput) { - nextComponenets.writeAsCsv(outputPath, "\n", " "); - } else { - nextComponenets.print(); - } - - // execute program - env.execute("Connected Components Example"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Function that turns a value into a 2-tuple where both fields are that value. - */ - @ForwardedFields("*->f0") - public static final class DuplicateValue<T> implements MapFunction<T, Tuple2<T, T>> { - - @Override - public Tuple2<T, T> map(T vertex) { - return new Tuple2<T, T>(vertex, vertex); - } - } - - /** - * Undirected edges by emitting for each input edge the input edges itself and an inverted version. - */ - public static final class UndirectEdge implements FlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { - Tuple2<Long, Long> invertedEdge = new Tuple2<Long, Long>(); - - @Override - public void flatMap(Tuple2<Long, Long> edge, Collector<Tuple2<Long, Long>> out) { - invertedEdge.f0 = edge.f1; - invertedEdge.f1 = edge.f0; - out.collect(edge); - out.collect(invertedEdge); - } - } - - /** - * UDF that joins a (Vertex-ID, Component-ID) pair that represents the current component that - * a vertex is associated with, with a (Source-Vertex-ID, Target-VertexID) edge. The function - * produces a (Target-vertex-ID, Component-ID) pair. - */ - @ForwardedFieldsFirst("f1->f1") - @ForwardedFieldsSecond("f1->f0") - public static final class NeighborWithComponentIDJoin implements JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { - - @Override - public Tuple2<Long, Long> join(Tuple2<Long, Long> vertexWithComponent, Tuple2<Long, Long> edge) { - return new Tuple2<Long, Long>(edge.f1, vertexWithComponent.f1); - } - } - - - - @ForwardedFieldsFirst("*") - public static final class ComponentIdFilter implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>> { - - @Override - public void join(Tuple2<Long, Long> candidate, Tuple2<Long, Long> old, Collector<Tuple2<Long, Long>> out) { - if (candidate.f1 < old.f1) { - out.collect(candidate); - } - } - } - - - - @Override - public String getDescription() { - return "Parameters: <vertices-path> <edges-path> <result-path> <max-number-of-iterations>"; - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String verticesPath = null; - private static String edgesPath = null; - private static String outputPath = null; - private static int maxIterations = 10; - - private static boolean parseParameters(String[] programArguments) { - - if(programArguments.length > 0) { - // parse input arguments - fileOutput = true; - if(programArguments.length == 4) { - verticesPath = programArguments[0]; - edgesPath = programArguments[1]; - outputPath = programArguments[2]; - maxIterations = Integer.parseInt(programArguments[3]); - } else { - System.err.println("Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>"); - return false; - } - } else { - System.out.println("Executing Connected Components example with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" Usage: ConnectedComponents <vertices path> <edges path> <result path> <max number of iterations>"); - } - return true; - } - - private static DataSet<Long> getVertexDataSet(ExecutionEnvironment env) { - - if(fileOutput) { - return env.readCsvFile(verticesPath).types(Long.class) - .map( - new MapFunction<Tuple1<Long>, Long>() { - public Long map(Tuple1<Long> value) { return value.f0; } - }); - } else { - return ConnectedComponentsData.getDefaultVertexDataSet(env); - } - } - - private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) { - - if(fileOutput) { - return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class); - } else { - return ConnectedComponentsData.getDefaultEdgeDataSet(env); - } - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java deleted file mode 100644 index c65fb69..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/ExampleDriver.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.examples; - -import org.apache.hadoop.util.ProgramDriver; -import org.apache.tez.common.counters.TezCounters; -import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.api.client.DAGClient; -import org.apache.tez.dag.api.client.DAGStatus; -import org.apache.tez.dag.api.client.Progress; -import org.apache.tez.dag.api.client.StatusGetOpts; -import org.apache.tez.dag.api.client.VertexStatus; - -import java.io.IOException; -import java.text.DecimalFormat; -import java.util.EnumSet; -import java.util.Set; - -public class ExampleDriver { - - private static final DecimalFormat formatter = new DecimalFormat("###.##%"); - - public static void main(String [] args){ - int exitCode = -1; - ProgramDriver pgd = new ProgramDriver(); - try { - pgd.addClass("wc", WordCount.class, - "Wordcount"); - pgd.addClass("tpch3", TPCHQuery3.class, - "Modified TPC-H 3 query"); - pgd.addClass("tc", TransitiveClosureNaiveStep.class, - "One step of transitive closure"); - pgd.addClass("pr", PageRankBasicStep.class, - "One step of PageRank"); - pgd.addClass("cc", ConnectedComponentsStep.class, - "One step of connected components"); - exitCode = pgd.run(args); - } catch(Throwable e){ - e.printStackTrace(); - } - System.exit(exitCode); - } - - public static void printDAGStatus(DAGClient dagClient, String[] vertexNames) - throws IOException, TezException { - printDAGStatus(dagClient, vertexNames, false, false); - } - - public static void printDAGStatus(DAGClient dagClient, String[] vertexNames, boolean displayDAGCounters, boolean displayVertexCounters) - throws IOException, TezException { - Set<StatusGetOpts> opts = EnumSet.of(StatusGetOpts.GET_COUNTERS); - DAGStatus dagStatus = dagClient.getDAGStatus( - (displayDAGCounters ? opts : null)); - Progress progress = dagStatus.getDAGProgress(); - double vProgressFloat = 0.0f; - if (progress != null) { - System.out.println(""); - System.out.println("DAG: State: " - + dagStatus.getState() - + " Progress: " - + (progress.getTotalTaskCount() < 0 ? formatter.format(0.0f) : - formatter.format((double)(progress.getSucceededTaskCount()) - /progress.getTotalTaskCount()))); - for (String vertexName : vertexNames) { - VertexStatus vStatus = dagClient.getVertexStatus(vertexName, - (displayVertexCounters ? opts : null)); - if (vStatus == null) { - System.out.println("Could not retrieve status for vertex: " - + vertexName); - continue; - } - Progress vProgress = vStatus.getProgress(); - if (vProgress != null) { - vProgressFloat = 0.0f; - if (vProgress.getTotalTaskCount() == 0) { - vProgressFloat = 1.0f; - } else if (vProgress.getTotalTaskCount() > 0) { - vProgressFloat = (double)vProgress.getSucceededTaskCount() - /vProgress.getTotalTaskCount(); - } - System.out.println("VertexStatus:" - + " VertexName: " - + (vertexName.equals("ivertex1") ? "intermediate-reducer" - : vertexName) - + " Progress: " + formatter.format(vProgressFloat)); - } - if (displayVertexCounters) { - TezCounters counters = vStatus.getVertexCounters(); - if (counters != null) { - System.out.println("Vertex Counters for " + vertexName + ": " - + counters); - } - } - } - } - if (displayDAGCounters) { - TezCounters counters = dagStatus.getDAGCounters(); - if (counters != null) { - System.out.println("DAG Counters: " + counters); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java deleted file mode 100644 index 031893d..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/PageRankBasicStep.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.examples; - - -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.examples.java.graph.util.PageRankData; -import org.apache.flink.util.Collector; - -import java.util.ArrayList; - -import static org.apache.flink.api.java.aggregation.Aggregations.SUM; - -public class PageRankBasicStep { - - private static final double DAMPENING_FACTOR = 0.85; - private static final double EPSILON = 0.0001; - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - // set up execution environment - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - // get input data - DataSet<Long> pagesInput = getPagesDataSet(env); - DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env); - - // assign initial rank to pages - DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput. - map(new RankAssigner((1.0d / numPages))); - - // build adjacency list from link input - DataSet<Tuple2<Long, Long[]>> adjacencyListInput = - linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList()); - - DataSet<Tuple2<Long, Double>> newRanks = pagesWithRanks - .join(adjacencyListInput).where(0).equalTo(0) - .flatMap(new JoinVertexWithEdgesMatch()) - .groupBy(0).aggregate(SUM, 1) - .map(new Dampener(DAMPENING_FACTOR, numPages)); - - - // emit result - if(fileOutput) { - newRanks.writeAsCsv(outputPath, "\n", " "); - } else { - newRanks.print(); - } - - // execute program - env.execute("Basic Page Rank Example"); - - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * A map function that assigns an initial rank to all pages. - */ - public static final class RankAssigner implements MapFunction<Long, Tuple2<Long, Double>> { - Tuple2<Long, Double> outPageWithRank; - - public RankAssigner(double rank) { - this.outPageWithRank = new Tuple2<Long, Double>(-1l, rank); - } - - @Override - public Tuple2<Long, Double> map(Long page) { - outPageWithRank.f0 = page; - return outPageWithRank; - } - } - - /** - * A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges - * originate. Run as a pre-processing step. - */ - @ForwardedFields("0") - public static final class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> { - - private final ArrayList<Long> neighbors = new ArrayList<Long>(); - - @Override - public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) { - neighbors.clear(); - Long id = 0L; - - for (Tuple2<Long, Long> n : values) { - id = n.f0; - neighbors.add(n.f1); - } - out.collect(new Tuple2<Long, Long[]>(id, neighbors.toArray(new Long[neighbors.size()]))); - } - } - - /** - * Join function that distributes a fraction of a vertex's rank to all neighbors. - */ - public static final class JoinVertexWithEdgesMatch implements FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> { - - @Override - public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, Collector<Tuple2<Long, Double>> out){ - Long[] neigbors = value.f1.f1; - double rank = value.f0.f1; - double rankToDistribute = rank / ((double) neigbors.length); - - for (int i = 0; i < neigbors.length; i++) { - out.collect(new Tuple2<Long, Double>(neigbors[i], rankToDistribute)); - } - } - } - - /** - * The function that applies the page rank dampening formula - */ - @ForwardedFields("0") - public static final class Dampener implements MapFunction<Tuple2<Long,Double>, Tuple2<Long,Double>> { - - private final double dampening; - private final double randomJump; - - public Dampener(double dampening, double numVertices) { - this.dampening = dampening; - this.randomJump = (1 - dampening) / numVertices; - } - - @Override - public Tuple2<Long, Double> map(Tuple2<Long, Double> value) { - value.f1 = (value.f1 * dampening) + randomJump; - return value; - } - } - - /** - * Filter that filters vertices where the rank difference is below a threshold. - */ - public static final class EpsilonFilter implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> { - - @Override - public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) { - return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON; - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String pagesInputPath = null; - private static String linksInputPath = null; - private static String outputPath = null; - private static long numPages = 0; - private static int maxIterations = 10; - - private static boolean parseParameters(String[] args) { - - if(args.length > 0) { - if(args.length == 5) { - fileOutput = true; - pagesInputPath = args[0]; - linksInputPath = args[1]; - outputPath = args[2]; - numPages = Integer.parseInt(args[3]); - maxIterations = Integer.parseInt(args[4]); - } else { - System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>"); - return false; - } - } else { - System.out.println("Executing PageRank Basic example with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>"); - - numPages = PageRankData.getNumberOfPages(); - } - return true; - } - - private static DataSet<Long> getPagesDataSet(ExecutionEnvironment env) { - if(fileOutput) { - return env - .readCsvFile(pagesInputPath) - .fieldDelimiter(' ') - .lineDelimiter("\n") - .types(Long.class) - .map(new MapFunction<Tuple1<Long>, Long>() { - @Override - public Long map(Tuple1<Long> v) { return v.f0; } - }); - } else { - return PageRankData.getDefaultPagesDataSet(env); - } - } - - private static DataSet<Tuple2<Long, Long>> getLinksDataSet(ExecutionEnvironment env) { - if(fileOutput) { - return env.readCsvFile(linksInputPath) - .fieldDelimiter(' ') - .lineDelimiter("\n") - .types(Long.class, Long.class); - } else { - return PageRankData.getDefaultEdgeDataSet(env); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java deleted file mode 100644 index d61f80e..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/TPCHQuery3.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.examples; - -import org.apache.flink.api.common.functions.FilterFunction; -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.aggregation.Aggregations; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.tez.client.RemoteTezEnvironment; - -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; - -public class TPCHQuery3 { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - final RemoteTezEnvironment env = RemoteTezEnvironment.create(); - env.setParallelism(400); - - - // get input data - DataSet<Lineitem> lineitems = getLineitemDataSet(env); - DataSet<Order> orders = getOrdersDataSet(env); - DataSet<Customer> customers = getCustomerDataSet(env); - - // Filter market segment "AUTOMOBILE" - customers = customers.filter( - new FilterFunction<Customer>() { - @Override - public boolean filter(Customer c) { - return c.getMktsegment().equals("AUTOMOBILE"); - } - }); - - // Filter all Orders with o_orderdate < 12.03.1995 - orders = orders.filter( - new FilterFunction<Order>() { - private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd"); - private final Date date = format.parse("1995-03-12"); - - @Override - public boolean filter(Order o) throws ParseException { - return format.parse(o.getOrderdate()).before(date); - } - }); - - // Filter all Lineitems with l_shipdate > 12.03.1995 - lineitems = lineitems.filter( - new FilterFunction<Lineitem>() { - private final DateFormat format = new SimpleDateFormat("yyyy-MM-dd"); - private final Date date = format.parse("1995-03-12"); - - @Override - public boolean filter(Lineitem l) throws ParseException { - return format.parse(l.getShipdate()).after(date); - } - }); - - // Join customers with orders and package them into a ShippingPriorityItem - DataSet<ShippingPriorityItem> customerWithOrders = - customers.join(orders).where(0).equalTo(1) - .with( - new JoinFunction<Customer, Order, ShippingPriorityItem>() { - @Override - public ShippingPriorityItem join(Customer c, Order o) { - return new ShippingPriorityItem(o.getOrderKey(), 0.0, o.getOrderdate(), - o.getShippriority()); - } - }); - - // Join the last join result with Lineitems - DataSet<ShippingPriorityItem> result = - customerWithOrders.join(lineitems).where(0).equalTo(0) - .with( - new JoinFunction<ShippingPriorityItem, Lineitem, ShippingPriorityItem>() { - @Override - public ShippingPriorityItem join(ShippingPriorityItem i, Lineitem l) { - i.setRevenue(l.getExtendedprice() * (1 - l.getDiscount())); - return i; - } - }) - // Group by l_orderkey, o_orderdate and o_shippriority and compute revenue sum - .groupBy(0, 2, 3) - .aggregate(Aggregations.SUM, 1); - - // emit result - result.writeAsCsv(outputPath, "\n", "|"); - - // execute program - env.registerMainClass(TPCHQuery3.class); - env.execute("TPCH Query 3 Example"); - - } - - // ************************************************************************* - // DATA TYPES - // ************************************************************************* - - public static class Lineitem extends Tuple4<Integer, Double, Double, String> { - - public Integer getOrderkey() { return this.f0; } - public Double getDiscount() { return this.f2; } - public Double getExtendedprice() { return this.f1; } - public String getShipdate() { return this.f3; } - } - - public static class Customer extends Tuple2<Integer, String> { - - public Integer getCustKey() { return this.f0; } - public String getMktsegment() { return this.f1; } - } - - public static class Order extends Tuple4<Integer, Integer, String, Integer> { - - public Integer getOrderKey() { return this.f0; } - public Integer getCustKey() { return this.f1; } - public String getOrderdate() { return this.f2; } - public Integer getShippriority() { return this.f3; } - } - - public static class ShippingPriorityItem extends Tuple4<Integer, Double, String, Integer> { - - public ShippingPriorityItem() { } - - public ShippingPriorityItem(Integer o_orderkey, Double revenue, - String o_orderdate, Integer o_shippriority) { - this.f0 = o_orderkey; - this.f1 = revenue; - this.f2 = o_orderdate; - this.f3 = o_shippriority; - } - - public Integer getOrderkey() { return this.f0; } - public void setOrderkey(Integer orderkey) { this.f0 = orderkey; } - public Double getRevenue() { return this.f1; } - public void setRevenue(Double revenue) { this.f1 = revenue; } - - public String getOrderdate() { return this.f2; } - public Integer getShippriority() { return this.f3; } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static String lineitemPath; - private static String customerPath; - private static String ordersPath; - private static String outputPath; - - private static boolean parseParameters(String[] programArguments) { - - if(programArguments.length > 0) { - if(programArguments.length == 4) { - lineitemPath = programArguments[0]; - customerPath = programArguments[1]; - ordersPath = programArguments[2]; - outputPath = programArguments[3]; - } else { - System.err.println("Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>"); - return false; - } - } else { - System.err.println("This program expects data from the TPC-H benchmark as input data.\n" + - " Due to legal restrictions, we can not ship generated data.\n" + - " You can find the TPC-H data generator at http://www.tpc.org/tpch/.\n" + - " Usage: TPCHQuery3 <lineitem-csv path> <customer-csv path> <orders-csv path> <result path>"); - return false; - } - return true; - } - - private static DataSet<Lineitem> getLineitemDataSet(ExecutionEnvironment env) { - return env.readCsvFile(lineitemPath) - .fieldDelimiter('|') - .includeFields("1000011000100000") - .tupleType(Lineitem.class); - } - - private static DataSet<Customer> getCustomerDataSet(ExecutionEnvironment env) { - return env.readCsvFile(customerPath) - .fieldDelimiter('|') - .includeFields("10000010") - .tupleType(Customer.class); - } - - private static DataSet<Order> getOrdersDataSet(ExecutionEnvironment env) { - return env.readCsvFile(ordersPath) - .fieldDelimiter('|') - .includeFields("110010010") - .tupleType(Order.class); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java deleted file mode 100644 index b014c3e..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/TransitiveClosureNaiveStep.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.examples; - -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.api.common.functions.JoinFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.examples.java.graph.util.ConnectedComponentsData; -import org.apache.flink.util.Collector; - - -/* - * NOTE: - * This program is currently supposed to throw a Compiler Exception due to TEZ-1190 - */ - -public class TransitiveClosureNaiveStep implements ProgramDescription { - - - public static void main (String... args) throws Exception{ - - if (!parseParameters(args)) { - return; - } - - // set up execution environment - ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - DataSet<Tuple2<Long, Long>> edges = getEdgeDataSet(env); - - DataSet<Tuple2<Long,Long>> nextPaths = edges - .join(edges) - .where(1) - .equalTo(0) - .with(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() { - @Override - /** - left: Path (z,x) - x is reachable by z - right: Edge (x,y) - edge x-->y exists - out: Path (z,y) - y is reachable by z - */ - public Tuple2<Long, Long> join(Tuple2<Long, Long> left, Tuple2<Long, Long> right) throws Exception { - return new Tuple2<Long, Long>( - new Long(left.f0), - new Long(right.f1)); - } - }) - .union(edges) - .groupBy(0, 1) - .reduceGroup(new GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() { - @Override - public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception { - out.collect(values.iterator().next()); - } - }); - - // emit result - if (fileOutput) { - nextPaths.writeAsCsv(outputPath, "\n", " "); - } else { - nextPaths.print(); - } - - // execute program - env.execute("Transitive Closure Example"); - - } - - @Override - public String getDescription() { - return "Parameters: <edges-path> <result-path> <max-number-of-iterations>"; - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String edgesPath = null; - private static String outputPath = null; - private static int maxIterations = 10; - - private static boolean parseParameters(String[] programArguments) { - - if (programArguments.length > 0) { - // parse input arguments - fileOutput = true; - if (programArguments.length == 3) { - edgesPath = programArguments[0]; - outputPath = programArguments[1]; - maxIterations = Integer.parseInt(programArguments[2]); - } else { - System.err.println("Usage: TransitiveClosure <edges path> <result path> <max number of iterations>"); - return false; - } - } else { - System.out.println("Executing TransitiveClosure example with default parameters and built-in default data."); - System.out.println(" Provide parameters to read input data from files."); - System.out.println(" See the documentation for the correct format of input files."); - System.out.println(" Usage: TransitiveClosure <edges path> <result path> <max number of iterations>"); - } - return true; - } - - - private static DataSet<Tuple2<Long, Long>> getEdgeDataSet(ExecutionEnvironment env) { - - if(fileOutput) { - return env.readCsvFile(edgesPath).fieldDelimiter(' ').types(Long.class, Long.class); - } else { - return ConnectedComponentsData.getDefaultEdgeDataSet(env); - } - } - -} - http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java deleted file mode 100644 index e758156..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/examples/WordCount.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.examples; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.examples.java.wordcount.util.WordCountData; -import org.apache.flink.tez.client.RemoteTezEnvironment; -import org.apache.flink.util.Collector; - -public class WordCount { - - // ************************************************************************* - // PROGRAM - // ************************************************************************* - - public static void main(String[] args) throws Exception { - - if(!parseParameters(args)) { - return; - } - - // set up the execution environment - final RemoteTezEnvironment env = RemoteTezEnvironment.create(); - env.setParallelism(8); - - // get input data - DataSet<String> text = getTextDataSet(env); - - DataSet<Tuple2<String, Integer>> counts = - // split up the lines in pairs (2-tuples) containing: (word,1) - text.flatMap(new Tokenizer()) - // group by the tuple field "0" and sum up tuple field "1" - .groupBy(0) - .sum(1); - - // emit result - if(fileOutput) { - counts.writeAsCsv(outputPath, "\n", " "); - } else { - counts.print(); - } - - // execute program - env.registerMainClass(WordCount.class); - env.execute("WordCount Example"); - } - - // ************************************************************************* - // USER FUNCTIONS - // ************************************************************************* - - /** - * Implements the string tokenizer that splits sentences into words as a user-defined - * FlatMapFunction. The function takes a line (String) and splits it into - * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>). - */ - public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { - - @Override - public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { - // normalize and split the line - String[] tokens = value.toLowerCase().split("\\W+"); - - // emit the pairs - for (String token : tokens) { - if (token.length() > 0) { - out.collect(new Tuple2<String, Integer>(token, 1)); - } - } - } - } - - // ************************************************************************* - // UTIL METHODS - // ************************************************************************* - - private static boolean fileOutput = false; - private static String textPath; - private static String outputPath; - - private static boolean parseParameters(String[] args) { - - if(args.length > 0) { - // parse input arguments - fileOutput = true; - if(args.length == 2) { - textPath = args[0]; - outputPath = args[1]; - } else { - System.err.println("Usage: WordCount <text path> <result path>"); - return false; - } - } else { - System.out.println("Executing WordCount example with built-in default data."); - System.out.println(" Provide parameters to read input data from a file."); - System.out.println(" Usage: WordCount <text path> <result path>"); - } - return true; - } - - private static DataSet<String> getTextDataSet(ExecutionEnvironment env) { - if(fileOutput) { - // read the text file from given input path - return env.readTextFile(textPath); - } else { - // get default test text data - return WordCountData.getDefaultTextLineDataSet(env); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java deleted file mode 100644 index 8011d21..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSinkProcessor.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.runtime; - - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.api.common.typeutils.TypeComparatorFactory; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; -import org.apache.flink.runtime.operators.util.CloseableInputProvider; -import org.apache.flink.tez.runtime.input.TezReaderIterator; -import org.apache.flink.tez.util.DummyInvokable; -import org.apache.flink.tez.util.EncodingUtils; -import org.apache.flink.util.MutableObjectIterator; -import org.apache.hadoop.conf.Configuration; -import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; -import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.LogicalInput; -import org.apache.tez.runtime.api.LogicalOutput; -import org.apache.tez.runtime.api.ProcessorContext; -import org.apache.tez.runtime.library.api.KeyValueReader; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -public class DataSinkProcessor<IT> extends AbstractLogicalIOProcessor { - - // Tez stuff - private TezTaskConfig config; - protected Map<String, LogicalInput> inputs; - private List<KeyValueReader> readers; - private int numInputs; - private TezRuntimeEnvironment runtimeEnvironment; - AbstractInvokable invokable = new DummyInvokable(); - - // Flink stuff - private OutputFormat<IT> format; - private ClassLoader userCodeClassLoader = this.getClass().getClassLoader(); - private CloseableInputProvider<IT> localStrategy; - // input reader - private MutableObjectIterator<IT> reader; - // input iterator - private MutableObjectIterator<IT> input; - private TypeSerializerFactory<IT> inputTypeSerializerFactory; - - - - - public DataSinkProcessor(ProcessorContext context) { - super(context); - } - - @Override - public void initialize() throws Exception { - UserPayload payload = getContext().getUserPayload(); - Configuration conf = TezUtils.createConfFromUserPayload(payload); - - this.config = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader()); - config.setTaskName(getContext().getTaskVertexName()); - - this.runtimeEnvironment = new TezRuntimeEnvironment((long) (0.7 * this.getContext().getTotalMemoryAvailableToTask())); - - this.inputTypeSerializerFactory = this.config.getInputSerializer(0, this.userCodeClassLoader); - - initOutputFormat(); - } - - @Override - public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { - - Preconditions.checkArgument((outputs == null) || (outputs.size() == 0)); - Preconditions.checkArgument(inputs.size() == 1); - - this.inputs = inputs; - this.numInputs = inputs.size(); - this.readers = new ArrayList<KeyValueReader>(numInputs); - if (this.inputs != null) { - for (LogicalInput input: this.inputs.values()) { - //if (input instanceof AbstractLogicalInput) { - // ((AbstractLogicalInput) input).initialize(); - //} - input.start(); - readers.add((KeyValueReader) input.getReader()); - } - } - - this.reader = new TezReaderIterator<IT>(readers.get(0)); - - this.invoke(); - } - - @Override - public void handleEvents(List<Event> processorEvents) { - - } - - @Override - public void close() throws Exception { - this.runtimeEnvironment.getIOManager().shutdown(); - } - - private void invoke () { - try { - // initialize local strategies - switch (this.config.getInputLocalStrategy(0)) { - case NONE: - // nothing to do - localStrategy = null; - input = reader; - break; - case SORT: - // initialize sort local strategy - try { - // get type comparator - TypeComparatorFactory<IT> compFact = this.config.getInputComparator(0, this.userCodeClassLoader); - if (compFact == null) { - throw new Exception("Missing comparator factory for local strategy on input " + 0); - } - - // initialize sorter - UnilateralSortMerger<IT> sorter = new UnilateralSortMerger<IT>( - this.runtimeEnvironment.getMemoryManager(), - this.runtimeEnvironment.getIOManager(), - this.reader, this.invokable, this.inputTypeSerializerFactory, compFact.createComparator(), - this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0), - this.config.getSpillingThresholdInput(0), false); - - this.localStrategy = sorter; - this.input = sorter.getIterator(); - } catch (Exception e) { - throw new RuntimeException("Initializing the input processing failed" + - e.getMessage() == null ? "." : ": " + e.getMessage(), e); - } - break; - default: - throw new RuntimeException("Invalid local strategy for DataSinkTask"); - } - - final TypeSerializer<IT> serializer = this.inputTypeSerializerFactory.getSerializer(); - final MutableObjectIterator<IT> input = this.input; - final OutputFormat<IT> format = this.format; - - - IT record = serializer.createInstance(); - format.open (this.getContext().getTaskIndex(), this.getContext().getVertexParallelism()); - - // work! - while (((record = input.next(record)) != null)) { - format.writeRecord(record); - } - - this.format.close(); - this.format = null; - } - catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException(); - } - finally { - if (this.format != null) { - // close format, if it has not been closed, yet. - // This should only be the case if we had a previous error, or were canceled. - try { - this.format.close(); - } - catch (Throwable t) { - //TODO log warning message - } - } - // close local strategy if necessary - if (localStrategy != null) { - try { - this.localStrategy.close(); - } catch (Throwable t) { - //TODO log warning message - } - } - } - } - - private void initOutputFormat () { - try { - this.format = this.config.<OutputFormat<IT>>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(OutputFormat.class, this.userCodeClassLoader); - - // check if the class is a subclass, if the check is required - if (!OutputFormat.class.isAssignableFrom(this.format.getClass())) { - throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" + - OutputFormat.class.getName() + "' as is required."); - } - } - catch (ClassCastException ccex) { - throw new RuntimeException("The stub class is not a proper subclass of " + OutputFormat.class.getName(), ccex); - } - - // configure the stub. catch exceptions here extra, to report them as originating from the user code - try { - this.format.configure(this.config.getStubParameters()); - } - catch (Throwable t) { - throw new RuntimeException("The user defined 'configure()' method in the Output Format caused an error: " - + t.getMessage(), t); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java deleted file mode 100644 index dd3f843..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/DataSourceProcessor.java +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.runtime; - -import com.google.common.base.Preconditions; -import org.apache.flink.api.common.distributions.DataDistribution; -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypeComparatorFactory; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.core.io.InputSplit; -import org.apache.flink.runtime.operators.shipping.ShipStrategyType; -import org.apache.flink.tez.runtime.input.FlinkInput; -import org.apache.flink.tez.runtime.output.TezChannelSelector; -import org.apache.flink.tez.runtime.output.TezOutputCollector; -import org.apache.flink.tez.runtime.output.TezOutputEmitter; -import org.apache.flink.tez.util.EncodingUtils; -import org.apache.flink.util.Collector; -import org.apache.hadoop.conf.Configuration; -import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; -import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.LogicalInput; -import org.apache.tez.runtime.api.LogicalOutput; -import org.apache.tez.runtime.api.ProcessorContext; -import org.apache.tez.runtime.library.api.KeyValueWriter; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - - -public class DataSourceProcessor<OT> extends AbstractLogicalIOProcessor { - - private TezTaskConfig config; - protected Map<String, LogicalOutput> outputs; - private List<KeyValueWriter> writers; - private int numOutputs; - private Collector<OT> collector; - - private InputFormat<OT, InputSplit> format; - private TypeSerializerFactory<OT> serializerFactory; - private FlinkInput input; - private ClassLoader userCodeClassLoader = getClass().getClassLoader(); - - - public DataSourceProcessor(ProcessorContext context) { - super(context); - } - - @Override - public void initialize() throws Exception { - UserPayload payload = getContext().getUserPayload(); - Configuration conf = TezUtils.createConfFromUserPayload(payload); - - this.config = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader()); - config.setTaskName(getContext().getTaskVertexName()); - - this.serializerFactory = config.getOutputSerializer(this.userCodeClassLoader); - - initInputFormat(); - } - - @Override - public void handleEvents(List<Event> processorEvents) { - int i = 0; - } - - @Override - public void close() throws Exception { - - } - - @Override - public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { - - Preconditions.checkArgument(inputs.size() == 1); - LogicalInput logicalInput = inputs.values().iterator().next(); - if (!(logicalInput instanceof FlinkInput)) { - throw new RuntimeException("Input to Flink Data Source Processor should be of type FlinkInput"); - } - this.input = (FlinkInput) logicalInput; - //this.reader = (KeyValueReader) input.getReader(); - - // Initialize inputs, get readers and writers - this.outputs = outputs; - this.numOutputs = outputs.size(); - this.writers = new ArrayList<KeyValueWriter>(numOutputs); - if (this.outputs != null) { - for (LogicalOutput output : this.outputs.values()) { - output.start(); - writers.add((KeyValueWriter) output.getWriter()); - } - } - this.invoke(); - } - - - private void invoke () { - final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer(); - try { - InputSplit split = input.getSplit(); - - OT record = serializer.createInstance(); - final InputFormat<OT, InputSplit> format = this.format; - format.open(split); - - int numOutputs = outputs.size(); - ArrayList<TezChannelSelector<OT>> channelSelectors = new ArrayList<TezChannelSelector<OT>>(numOutputs); - ArrayList<Integer> numStreamsInOutputs = this.config.getNumberSubtasksInOutput(); - for (int i = 0; i < numOutputs; i++) { - final ShipStrategyType strategy = config.getOutputShipStrategy(i); - final TypeComparatorFactory<OT> compFactory = config.getOutputComparator(i, this.userCodeClassLoader); - final DataDistribution dataDist = config.getOutputDataDistribution(i, this.userCodeClassLoader); - if (compFactory == null) { - channelSelectors.add(i, new TezOutputEmitter<OT>(strategy)); - } else if (dataDist == null){ - final TypeComparator<OT> comparator = compFactory.createComparator(); - channelSelectors.add(i, new TezOutputEmitter<OT>(strategy, comparator)); - } else { - final TypeComparator<OT> comparator = compFactory.createComparator(); - channelSelectors.add(i,new TezOutputEmitter<OT>(strategy, comparator, dataDist)); - } - } - collector = new TezOutputCollector<OT>(writers, channelSelectors, serializerFactory.getSerializer(), numStreamsInOutputs); - - while (!format.reachedEnd()) { - // build next pair and ship pair if it is valid - if ((record = format.nextRecord(record)) != null) { - collector.collect(record); - } - } - format.close(); - - collector.close(); - - } - catch (Exception ex) { - // close the input, but do not report any exceptions, since we already have another root cause - try { - this.format.close(); - } catch (Throwable t) {} - } - } - - - private void initInputFormat() { - try { - this.format = config.<InputFormat<OT, InputSplit>>getStubWrapper(this.userCodeClassLoader) - .getUserCodeObject(InputFormat.class, this.userCodeClassLoader); - - // check if the class is a subclass, if the check is required - if (!InputFormat.class.isAssignableFrom(this.format.getClass())) { - throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" + - InputFormat.class.getName() + "' as is required."); - } - } - catch (ClassCastException ccex) { - throw new RuntimeException("The stub class is not a proper subclass of " + InputFormat.class.getName(), - ccex); - } - // configure the stub. catch exceptions here extra, to report them as originating from the user code - try { - this.format.configure(this.config.getStubParameters()); - } - catch (Throwable t) { - throw new RuntimeException("The user defined 'configure()' method caused an error: " + t.getMessage(), t); - } - } - - -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java deleted file mode 100644 index 14d9cde..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/RegularProcessor.java +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.runtime; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.TaskInfo; -import org.apache.flink.api.common.accumulators.Accumulator; -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.operators.Driver; -import org.apache.flink.api.common.functions.util.RuntimeUDFContext; -import org.apache.flink.tez.util.EncodingUtils; -import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.conf.Configuration; -import org.apache.tez.common.TezUtils; -import org.apache.tez.dag.api.UserPayload; -import org.apache.tez.runtime.api.AbstractLogicalIOProcessor; -import org.apache.tez.runtime.api.Event; -import org.apache.tez.runtime.api.LogicalInput; -import org.apache.tez.runtime.api.LogicalOutput; -import org.apache.tez.runtime.api.ProcessorContext; -import org.apache.tez.runtime.library.api.KeyValueReader; -import org.apache.tez.runtime.library.api.KeyValueWriter; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Future; - - -public class RegularProcessor<S extends Function, OT> extends AbstractLogicalIOProcessor { - - private TezTask<S,OT> task; - protected Map<String, LogicalInput> inputs; - protected Map<String, LogicalOutput> outputs; - private List<KeyValueReader> readers; - private List<KeyValueWriter> writers; - private int numInputs; - private int numOutputs; - - - public RegularProcessor(ProcessorContext context) { - super(context); - } - - @Override - public void initialize() throws Exception { - UserPayload payload = getContext().getUserPayload(); - Configuration conf = TezUtils.createConfFromUserPayload(payload); - - TezTaskConfig taskConfig = (TezTaskConfig) EncodingUtils.decodeObjectFromString(conf.get(TezTaskConfig.TEZ_TASK_CONFIG), getClass().getClassLoader()); - taskConfig.setTaskName(getContext().getTaskVertexName()); - - RuntimeUDFContext runtimeUdfContext = new RuntimeUDFContext( - new TaskInfo( - getContext().getTaskVertexName(), - getContext().getTaskIndex(), - getContext().getVertexParallelism(), - getContext().getTaskAttemptNumber() - ), - getClass().getClassLoader(), - new ExecutionConfig(), - new HashMap<String, Future<Path>>(), - new HashMap<String, Accumulator<?, ?>>()); - - this.task = new TezTask<S, OT>(taskConfig, runtimeUdfContext, this.getContext().getTotalMemoryAvailableToTask()); - } - - @Override - public void handleEvents(List<Event> processorEvents) { - - } - - @Override - public void close() throws Exception { - task.getIOManager().shutdown(); - } - - @Override - public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { - - this.inputs = inputs; - this.outputs = outputs; - final Class<? extends Driver<S, OT>> driverClass = this.task.getTaskConfig().getDriver(); - Driver<S,OT> driver = InstantiationUtil.instantiate(driverClass, Driver.class); - this.numInputs = driver.getNumberOfInputs(); - this.numOutputs = outputs.size(); - - - this.readers = new ArrayList<KeyValueReader>(numInputs); - //Ensure size of list is = numInputs - for (int i = 0; i < numInputs; i++) - this.readers.add(null); - HashMap<String, ArrayList<Integer>> inputPositions = ((TezTaskConfig) this.task.getTaskConfig()).getInputPositions(); - if (this.inputs != null) { - for (String name : this.inputs.keySet()) { - LogicalInput input = this.inputs.get(name); - //if (input instanceof AbstractLogicalInput) { - // ((AbstractLogicalInput) input).initialize(); - //} - input.start(); - ArrayList<Integer> positions = inputPositions.get(name); - for (Integer pos : positions) { - //int pos = inputPositions.get(name); - readers.set(pos, (KeyValueReader) input.getReader()); - } - } - } - - this.writers = new ArrayList<KeyValueWriter>(numOutputs); - if (this.outputs != null) { - for (LogicalOutput output : this.outputs.values()) { - output.start(); - writers.add((KeyValueWriter) output.getWriter()); - } - } - - // Do the work - task.invoke (readers, writers); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/8b8dfc16/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java b/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java deleted file mode 100644 index b61a9b6..0000000 --- a/flink-contrib/flink-tez/src/main/java/org/apache/flink/tez/runtime/TezRuntimeEnvironment.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.tez.runtime; - -import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.memory.MemoryManager; - -public class TezRuntimeEnvironment { - - private final IOManager ioManager; - - private final MemoryManager memoryManager; - - public TezRuntimeEnvironment(long totalMemory) { - this.memoryManager = new MemoryManager(totalMemory, 1, MemoryManager.DEFAULT_PAGE_SIZE, MemoryType.HEAP, true); - this.ioManager = new IOManagerAsync(); - } - - public IOManager getIOManager() { - return ioManager; - } - - public MemoryManager getMemoryManager() { - return memoryManager; - } -}