Repository: flink Updated Branches: refs/heads/master bed3da4a6 -> a911559a1
[FLINK-1592] [streaming] StreamGraph refactor to store vertex IDs as Integers Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a911559a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a911559a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a911559a Branch: refs/heads/master Commit: a911559a1d8d8d160edcdb76cd16812de80b0d1c Parents: bed3da4 Author: Gyula Fora <[email protected]> Authored: Sat Feb 21 14:23:04 2015 +0100 Committer: Gyula Fora <[email protected]> Committed: Sat Feb 21 14:23:04 2015 +0100 ---------------------------------------------------------------------- .../flink/streaming/api/StreamConfig.java | 52 +-- .../apache/flink/streaming/api/StreamGraph.java | 373 +++++++++---------- .../api/StreamingJobGraphGenerator.java | 152 ++++---- .../flink/streaming/api/WindowingOptimzier.java | 50 +-- .../streaming/api/datastream/DataStream.java | 12 +- .../api/datastream/DiscretizedStream.java | 42 ++- .../api/datastream/WindowedDataStream.java | 9 +- .../api/streamvertex/OutputHandler.java | 20 +- .../api/collector/DirectedOutputTest.java | 2 +- .../windowing/WindowIntegrationTest.java | 24 ++ 10 files changed, 384 insertions(+), 352 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java index 658b742..efc0b8b 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java @@ -47,7 +47,7 @@ public class StreamConfig implements Serializable { private static final String IS_CHAINED_VERTEX = "isChainedSubtask"; private static final String OUTPUT_NAME = "outputName_"; private static final String PARTITIONER_OBJECT = "partitionerObject_"; - private static final String VERTEX_NAME = "vertexName"; + private static final String VERTEX_NAME = "vertexID"; private static final String ITERATION_ID = "iteration-id"; private static final String OUTPUT_SELECTOR = "outputSelector"; private static final String DIRECTED_EMIT = "directedEmit"; @@ -60,7 +60,7 @@ public class StreamConfig implements Serializable { private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1"; private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2"; private static final String ITERATON_WAIT = "iterationWait"; - private static final String OUTPUTS = "outVertexNames"; + private static final String OUTPUTS = "outvertexIDs"; private static final String EDGES_IN_ORDER = "rwOrder"; // DEFAULT VALUES @@ -79,12 +79,12 @@ public class StreamConfig implements Serializable { return config; } - public void setVertexName(String vertexName) { - config.setString(VERTEX_NAME, vertexName); + public void setVertexID(Integer vertexID) { + config.setInteger(VERTEX_NAME, vertexID); } - public String getTaskName() { - return config.getString(VERTEX_NAME, "Missing"); + public Integer getVertexID() { + return config.getInteger(VERTEX_NAME, -1); } public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) { @@ -223,14 +223,14 @@ public class StreamConfig implements Serializable { return config.getLong(ITERATON_WAIT, 0); } - public <T> void setPartitioner(String output, StreamPartitioner<T> partitionerObject) { + public <T> void setPartitioner(Integer output, StreamPartitioner<T> partitionerObject) { config.setBytes(PARTITIONER_OBJECT + output, SerializationUtils.serialize(partitionerObject)); } @SuppressWarnings("unchecked") - public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, String output) { + public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, Integer output) { StreamPartitioner<T> partitioner = null; try { partitioner = (StreamPartitioner<T>) InstantiationUtil.readObjectFromConfig( @@ -241,7 +241,7 @@ public class StreamConfig implements Serializable { return partitioner; } - public void setSelectedNames(String output, List<String> selected) { + public void setSelectedNames(Integer output, List<String> selected) { if (selected != null) { config.setBytes(OUTPUT_NAME + output, SerializationUtils.serialize((Serializable) selected)); @@ -252,7 +252,7 @@ public class StreamConfig implements Serializable { } @SuppressWarnings("unchecked") - public List<String> getSelectedNames(String output) { + public List<String> getSelectedNames(Integer output) { return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + output, null)); } @@ -273,28 +273,28 @@ public class StreamConfig implements Serializable { return config.getInteger(NUMBER_OF_OUTPUTS, 0); } - public void setOutputs(List<String> outputVertexNames) { - config.setBytes(OUTPUTS, SerializationUtils.serialize((Serializable) outputVertexNames)); + public void setOutputs(List<Integer> outputvertexIDs) { + config.setBytes(OUTPUTS, SerializationUtils.serialize((Serializable) outputvertexIDs)); } @SuppressWarnings("unchecked") - public List<String> getOutputs(ClassLoader cl) { + public List<Integer> getOutputs(ClassLoader cl) { try { - return (List<String>) InstantiationUtil.readObjectFromConfig(this.config, OUTPUTS, cl); + return (List<Integer>) InstantiationUtil.readObjectFromConfig(this.config, OUTPUTS, cl); } catch (Exception e) { throw new RuntimeException("Could not instantiate outputs."); } } - public void setOutEdgesInOrder(List<Tuple2<String, String>> outEdgeList) { + public void setOutEdgesInOrder(List<Tuple2<Integer, Integer>> outEdgeList) { config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList)); } @SuppressWarnings("unchecked") - public List<Tuple2<String, String>> getOutEdgesInOrder(ClassLoader cl) { + public List<Tuple2<Integer, Integer>> getOutEdgesInOrder(ClassLoader cl) { try { - return (List<Tuple2<String, String>>) InstantiationUtil.readObjectFromConfig( + return (List<Tuple2<Integer, Integer>>) InstantiationUtil.readObjectFromConfig( this.config, EDGES_IN_ORDER, cl); } catch (Exception e) { throw new RuntimeException("Could not instantiate outputs."); @@ -323,34 +323,34 @@ public class StreamConfig implements Serializable { } } - public void setChainedOutputs(List<String> chainedOutputs) { + public void setChainedOutputs(List<Integer> chainedOutputs) { config.setBytes(CHAINED_OUTPUTS, SerializationUtils.serialize((Serializable) chainedOutputs)); } @SuppressWarnings("unchecked") - public List<String> getChainedOutputs(ClassLoader cl) { + public List<Integer> getChainedOutputs(ClassLoader cl) { try { - return (List<String>) InstantiationUtil.readObjectFromConfig(this.config, + return (List<Integer>) InstantiationUtil.readObjectFromConfig(this.config, CHAINED_OUTPUTS, cl); } catch (Exception e) { throw new RuntimeException("Could not instantiate chained outputs."); } } - public void setTransitiveChainedTaskConfigs(Map<String, StreamConfig> chainedTaskConfigs) { + public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) { config.setBytes(CHAINED_TASK_CONFIG, SerializationUtils.serialize((Serializable) chainedTaskConfigs)); } @SuppressWarnings("unchecked") - public Map<String, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) { + public Map<Integer, StreamConfig> getTransitiveChainedTaskConfigs(ClassLoader cl) { try { - Map<String, StreamConfig> confs = (Map<String, StreamConfig>) InstantiationUtil + Map<Integer, StreamConfig> confs = (Map<Integer, StreamConfig>) InstantiationUtil .readObjectFromConfig(this.config, CHAINED_TASK_CONFIG, cl); - return confs == null ? new HashMap<String, StreamConfig>() : confs; + return confs == null ? new HashMap<Integer, StreamConfig>() : confs; } catch (Exception e) { throw new RuntimeException("Could not instantiate configuration."); } @@ -373,12 +373,12 @@ public class StreamConfig implements Serializable { builder.append("\n======================="); builder.append("Stream Config"); builder.append("======================="); - builder.append("\nTask name: " + getTaskName()); + builder.append("\nTask name: " + getVertexID()); builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs()); builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs()); builder.append("\nOutput names: " + getOutputs(cl)); builder.append("\nPartitioning:"); - for (String outputname : getOutputs(cl)) { + for (Integer outputname : getOutputs(cl)) { builder.append("\n\t" + outputname + ": " + getPartitioner(cl, outputname)); } http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java index 8e2b49e..82cd954 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java @@ -65,31 +65,31 @@ public class StreamGraph extends StreamingPlan { private String jobName = DEAFULT_JOB_NAME; // Graph attributes - private Map<String, Integer> operatorParallelisms; - private Map<String, Long> bufferTimeouts; - private Map<String, List<String>> outEdgeLists; - private Map<String, List<Integer>> outEdgeTypes; - private Map<String, List<List<String>>> selectedNames; - private Map<String, List<String>> inEdgeLists; - private Map<String, List<StreamPartitioner<?>>> outputPartitioners; - private Map<String, String> operatorNames; - private Map<String, StreamInvokable<?, ?>> invokableObjects; - private Map<String, StreamRecordSerializer<?>> typeSerializersIn1; - private Map<String, StreamRecordSerializer<?>> typeSerializersIn2; - private Map<String, StreamRecordSerializer<?>> typeSerializersOut1; - private Map<String, StreamRecordSerializer<?>> typeSerializersOut2; - private Map<String, Class<? extends AbstractInvokable>> jobVertexClasses; - private Map<String, List<OutputSelector<?>>> outputSelectors; - private Map<String, Integer> iterationIds; - private Map<Integer, String> iterationIDtoHeadName; - private Map<Integer, String> iterationIDtoTailName; - private Map<String, Integer> iterationTailCount; - private Map<String, Long> iterationTimeouts; - private Map<String, Map<String, OperatorState<?>>> operatorStates; - private Map<String, InputFormat<String, ?>> inputFormatLists; - private List<Map<String, ?>> containingMaps; - - private Set<String> sources; + private Map<Integer, Integer> operatorParallelisms; + private Map<Integer, Long> bufferTimeouts; + private Map<Integer, List<Integer>> outEdgeLists; + private Map<Integer, List<Integer>> outEdgeTypes; + private Map<Integer, List<List<String>>> selectedNames; + private Map<Integer, List<Integer>> inEdgeLists; + private Map<Integer, List<StreamPartitioner<?>>> outputPartitioners; + private Map<Integer, String> operatorNames; + private Map<Integer, StreamInvokable<?, ?>> invokableObjects; + private Map<Integer, StreamRecordSerializer<?>> typeSerializersIn1; + private Map<Integer, StreamRecordSerializer<?>> typeSerializersIn2; + private Map<Integer, StreamRecordSerializer<?>> typeSerializersOut1; + private Map<Integer, StreamRecordSerializer<?>> typeSerializersOut2; + private Map<Integer, Class<? extends AbstractInvokable>> jobVertexClasses; + private Map<Integer, List<OutputSelector<?>>> outputSelectors; + private Map<Integer, Integer> iterationIds; + private Map<Integer, Integer> iterationIDtoHeadID; + private Map<Integer, Integer> iterationIDtoTailID; + private Map<Integer, Integer> iterationTailCount; + private Map<Integer, Long> iterationTimeouts; + private Map<Integer, Map<String, OperatorState<?>>> operatorStates; + private Map<Integer, InputFormat<String, ?>> inputFormatLists; + private List<Map<Integer, ?>> containingMaps; + + private Set<Integer> sources; private ExecutionConfig executionConfig; @@ -105,58 +105,58 @@ public class StreamGraph extends StreamingPlan { } public void initGraph() { - containingMaps = new ArrayList<Map<String, ?>>(); + containingMaps = new ArrayList<Map<Integer, ?>>(); - operatorParallelisms = new HashMap<String, Integer>(); + operatorParallelisms = new HashMap<Integer, Integer>(); containingMaps.add(operatorParallelisms); - bufferTimeouts = new HashMap<String, Long>(); + bufferTimeouts = new HashMap<Integer, Long>(); containingMaps.add(bufferTimeouts); - outEdgeLists = new HashMap<String, List<String>>(); + outEdgeLists = new HashMap<Integer, List<Integer>>(); containingMaps.add(outEdgeLists); - outEdgeTypes = new HashMap<String, List<Integer>>(); + outEdgeTypes = new HashMap<Integer, List<Integer>>(); containingMaps.add(outEdgeTypes); - selectedNames = new HashMap<String, List<List<String>>>(); + selectedNames = new HashMap<Integer, List<List<String>>>(); containingMaps.add(selectedNames); - inEdgeLists = new HashMap<String, List<String>>(); + inEdgeLists = new HashMap<Integer, List<Integer>>(); containingMaps.add(inEdgeLists); - outputPartitioners = new HashMap<String, List<StreamPartitioner<?>>>(); + outputPartitioners = new HashMap<Integer, List<StreamPartitioner<?>>>(); containingMaps.add(outputPartitioners); - operatorNames = new HashMap<String, String>(); + operatorNames = new HashMap<Integer, String>(); containingMaps.add(operatorNames); - invokableObjects = new HashMap<String, StreamInvokable<?, ?>>(); + invokableObjects = new HashMap<Integer, StreamInvokable<?, ?>>(); containingMaps.add(invokableObjects); - typeSerializersIn1 = new HashMap<String, StreamRecordSerializer<?>>(); + typeSerializersIn1 = new HashMap<Integer, StreamRecordSerializer<?>>(); containingMaps.add(typeSerializersIn1); - typeSerializersIn2 = new HashMap<String, StreamRecordSerializer<?>>(); + typeSerializersIn2 = new HashMap<Integer, StreamRecordSerializer<?>>(); containingMaps.add(typeSerializersIn2); - typeSerializersOut1 = new HashMap<String, StreamRecordSerializer<?>>(); + typeSerializersOut1 = new HashMap<Integer, StreamRecordSerializer<?>>(); containingMaps.add(typeSerializersOut1); - typeSerializersOut2 = new HashMap<String, StreamRecordSerializer<?>>(); + typeSerializersOut2 = new HashMap<Integer, StreamRecordSerializer<?>>(); containingMaps.add(typeSerializersOut1); - outputSelectors = new HashMap<String, List<OutputSelector<?>>>(); + outputSelectors = new HashMap<Integer, List<OutputSelector<?>>>(); containingMaps.add(outputSelectors); - jobVertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>(); + jobVertexClasses = new HashMap<Integer, Class<? extends AbstractInvokable>>(); containingMaps.add(jobVertexClasses); - iterationIds = new HashMap<String, Integer>(); + iterationIds = new HashMap<Integer, Integer>(); containingMaps.add(jobVertexClasses); - iterationIDtoHeadName = new HashMap<Integer, String>(); - iterationIDtoTailName = new HashMap<Integer, String>(); - iterationTailCount = new HashMap<String, Integer>(); + iterationIDtoHeadID = new HashMap<Integer, Integer>(); + iterationIDtoTailID = new HashMap<Integer, Integer>(); + iterationTailCount = new HashMap<Integer, Integer>(); containingMaps.add(iterationTailCount); - iterationTimeouts = new HashMap<String, Long>(); + iterationTimeouts = new HashMap<Integer, Long>(); containingMaps.add(iterationTailCount); - operatorStates = new HashMap<String, Map<String, OperatorState<?>>>(); + operatorStates = new HashMap<Integer, Map<String, OperatorState<?>>>(); containingMaps.add(operatorStates); - inputFormatLists = new HashMap<String, InputFormat<String, ?>>(); + inputFormatLists = new HashMap<Integer, InputFormat<String, ?>>(); containingMaps.add(operatorStates); - sources = new HashSet<String>(); + sources = new HashSet<Integer>(); } /** * Adds a vertex to the streaming graph with the given parameters * - * @param vertexName - * Name of the vertex + * @param vertexID + * ID of the vertex * @param invokableObject * User defined operator * @param inTypeInfo @@ -168,38 +168,38 @@ public class StreamGraph extends StreamingPlan { * @param parallelism * Number of parallel instances created */ - public <IN, OUT> void addStreamVertex(String vertexName, + public <IN, OUT> void addStreamVertex(Integer vertexID, StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName, int parallelism) { - addVertex(vertexName, StreamVertex.class, invokableObject, operatorName, parallelism); + addVertex(vertexID, StreamVertex.class, invokableObject, operatorName, parallelism); StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? new StreamRecordSerializer<IN>( inTypeInfo, executionConfig) : null; StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null ? new StreamRecordSerializer<OUT>( outTypeInfo, executionConfig) : null; - addTypeSerializers(vertexName, inSerializer, null, outSerializer, null); + addTypeSerializers(vertexID, inSerializer, null, outSerializer, null); if (LOG.isDebugEnabled()) { - LOG.debug("Vertex: {}", vertexName); + LOG.debug("Vertex: {}", vertexID); } } - public <IN, OUT> void addSourceVertex(String vertexName, + public <IN, OUT> void addSourceVertex(Integer vertexID, StreamInvokable<IN, OUT> invokableObject, TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName, int parallelism) { - addStreamVertex(vertexName, invokableObject, inTypeInfo, outTypeInfo, operatorName, + addStreamVertex(vertexID, invokableObject, inTypeInfo, outTypeInfo, operatorName, parallelism); - sources.add(vertexName); + sources.add(vertexID); } /** * Adds a vertex for the iteration head to the {@link JobGraph}. The * iterated values will be fed from this vertex back to the graph. * - * @param vertexName - * Name of the vertex + * @param vertexID + * ID of the vertex * @param iterationHead * Id of the iteration head * @param iterationID @@ -209,29 +209,29 @@ public class StreamGraph extends StreamingPlan { * @param waitTime * Max wait time for next record */ - public void addIterationHead(String vertexName, String iterationHead, Integer iterationID, + public void addIterationHead(Integer vertexID, Integer iterationHead, Integer iterationID, int parallelism, long waitTime) { - addVertex(vertexName, StreamIterationHead.class, null, null, parallelism); + addVertex(vertexID, StreamIterationHead.class, null, null, parallelism); chaining = false; - iterationIds.put(vertexName, iterationID); - iterationIDtoHeadName.put(iterationID, vertexName); + iterationIds.put(vertexID, iterationID); + iterationIDtoHeadID.put(iterationID, vertexID); - setSerializersFrom(iterationHead, vertexName); + setSerializersFrom(iterationHead, vertexID); - setEdge(vertexName, iterationHead, + setEdge(vertexID, iterationHead, outputPartitioners.get(inEdgeLists.get(iterationHead).get(0)).get(0), 0, new ArrayList<String>()); - iterationTimeouts.put(iterationIDtoHeadName.get(iterationID), waitTime); + iterationTimeouts.put(iterationIDtoHeadID.get(iterationID), waitTime); if (LOG.isDebugEnabled()) { - LOG.debug("ITERATION SOURCE: {}", vertexName); + LOG.debug("ITERATION SOURCE: {}", vertexID); } - sources.add(vertexName); + sources.add(vertexID); } /** @@ -239,8 +239,8 @@ public class StreamGraph extends StreamingPlan { * intended to be iterated will be sent to this sink from the iteration * head. * - * @param vertexName - * Name of the vertex + * @param vertexID + * ID of the vertex * @param iterationTail * Id of the iteration tail * @param iterationID @@ -250,51 +250,50 @@ public class StreamGraph extends StreamingPlan { * @param waitTime * Max waiting time for next record */ - public void addIterationTail(String vertexName, String iterationTail, Integer iterationID, + public void addIterationTail(Integer vertexID, Integer iterationTail, Integer iterationID, long waitTime) { if (bufferTimeouts.get(iterationTail) == 0) { throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported."); } - addVertex(vertexName, StreamIterationTail.class, null, null, getParallelism(iterationTail)); + addVertex(vertexID, StreamIterationTail.class, null, null, getParallelism(iterationTail)); - iterationIds.put(vertexName, iterationID); - iterationIDtoTailName.put(iterationID, vertexName); + iterationIds.put(vertexID, iterationID); + iterationIDtoTailID.put(iterationID, vertexID); - setSerializersFrom(iterationTail, vertexName); - iterationTimeouts.put(iterationIDtoTailName.get(iterationID), waitTime); + setSerializersFrom(iterationTail, vertexID); + iterationTimeouts.put(iterationIDtoTailID.get(iterationID), waitTime); - setParallelism(iterationIDtoHeadName.get(iterationID), getParallelism(iterationTail)); - setBufferTimeout(iterationIDtoHeadName.get(iterationID), bufferTimeouts.get(iterationTail)); + setParallelism(iterationIDtoHeadID.get(iterationID), getParallelism(iterationTail)); + setBufferTimeout(iterationIDtoHeadID.get(iterationID), bufferTimeouts.get(iterationTail)); if (LOG.isDebugEnabled()) { - LOG.debug("ITERATION SINK: {}", vertexName); + LOG.debug("ITERATION SINK: {}", vertexID); } } - public <IN1, IN2, OUT> void addCoTask(String vertexName, + public <IN1, IN2, OUT> void addCoTask(Integer vertexID, CoInvokable<IN1, IN2, OUT> taskInvokableObject, TypeInformation<IN1> in1TypeInfo, TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName, int parallelism) { - addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism); + addVertex(vertexID, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism); - addTypeSerializers(vertexName, - new StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig), + addTypeSerializers(vertexID, new StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig), new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig), new StreamRecordSerializer<OUT>(outTypeInfo, executionConfig), null); if (LOG.isDebugEnabled()) { - LOG.debug("CO-TASK: {}", vertexName); + LOG.debug("CO-TASK: {}", vertexID); } } /** * Sets vertex parameters in the JobGraph * - * @param vertexName + * @param vertexID * Name of the vertex * @param vertexClass * The class of the vertex @@ -305,30 +304,30 @@ public class StreamGraph extends StreamingPlan { * @param parallelism * Number of parallel instances created */ - private void addVertex(String vertexName, Class<? extends AbstractInvokable> vertexClass, + private void addVertex(Integer vertexID, Class<? extends AbstractInvokable> vertexClass, StreamInvokable<?, ?> invokableObject, String operatorName, int parallelism) { - jobVertexClasses.put(vertexName, vertexClass); - setParallelism(vertexName, parallelism); - invokableObjects.put(vertexName, invokableObject); - operatorNames.put(vertexName, operatorName); - outEdgeLists.put(vertexName, new ArrayList<String>()); - outEdgeTypes.put(vertexName, new ArrayList<Integer>()); - selectedNames.put(vertexName, new ArrayList<List<String>>()); - outputSelectors.put(vertexName, new ArrayList<OutputSelector<?>>()); - inEdgeLists.put(vertexName, new ArrayList<String>()); - outputPartitioners.put(vertexName, new ArrayList<StreamPartitioner<?>>()); - iterationTailCount.put(vertexName, 0); + jobVertexClasses.put(vertexID, vertexClass); + setParallelism(vertexID, parallelism); + invokableObjects.put(vertexID, invokableObject); + operatorNames.put(vertexID, operatorName); + outEdgeLists.put(vertexID, new ArrayList<Integer>()); + outEdgeTypes.put(vertexID, new ArrayList<Integer>()); + selectedNames.put(vertexID, new ArrayList<List<String>>()); + outputSelectors.put(vertexID, new ArrayList<OutputSelector<?>>()); + inEdgeLists.put(vertexID, new ArrayList<Integer>()); + outputPartitioners.put(vertexID, new ArrayList<StreamPartitioner<?>>()); + iterationTailCount.put(vertexID, 0); } /** * Connects two vertices in the JobGraph using the selected partitioner * settings * - * @param upStreamVertexName - * Name of the upstream(output) vertex - * @param downStreamVertexName - * Name of the downstream(input) vertex + * @param upStreamVertexID + * ID of the upstream(output) vertex + * @param downStreamVertexID + * ID of the downstream(input) vertex * @param partitionerObject * Partitioner object * @param typeNumber @@ -336,16 +335,16 @@ public class StreamGraph extends StreamingPlan { * @param outputNames * User defined names of the out edge */ - public void setEdge(String upStreamVertexName, String downStreamVertexName, + public void setEdge(Integer upStreamVertexID, Integer downStreamVertexID, StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames) { - outEdgeLists.get(upStreamVertexName).add(downStreamVertexName); - outEdgeTypes.get(upStreamVertexName).add(typeNumber); - inEdgeLists.get(downStreamVertexName).add(upStreamVertexName); - outputPartitioners.get(upStreamVertexName).add(partitionerObject); - selectedNames.get(upStreamVertexName).add(outputNames); + outEdgeLists.get(upStreamVertexID).add(downStreamVertexID); + outEdgeTypes.get(upStreamVertexID).add(typeNumber); + inEdgeLists.get(downStreamVertexID).add(upStreamVertexID); + outputPartitioners.get(upStreamVertexID).add(partitionerObject); + selectedNames.get(upStreamVertexID).add(outputNames); } - public void removeEdge(String upStream, String downStream) { + public void removeEdge(Integer upStream, Integer downStream) { int inputIndex = getInEdges(downStream).indexOf(upStream); inEdgeLists.get(downStream).remove(inputIndex); @@ -356,71 +355,71 @@ public class StreamGraph extends StreamingPlan { outputPartitioners.get(upStream).remove(outputIndex); } - public void removeVertex(String toRemove) { - List<String> outEdges = new ArrayList<String>(getOutEdges(toRemove)); - List<String> inEdges = new ArrayList<String>(getInEdges(toRemove)); + public void removeVertex(Integer toRemove) { + List<Integer> outEdges = new ArrayList<Integer>(getOutEdges(toRemove)); + List<Integer> inEdges = new ArrayList<Integer>(getInEdges(toRemove)); - for (String output : outEdges) { + for (Integer output : outEdges) { removeEdge(toRemove, output); } - for (String input : inEdges) { + for (Integer input : inEdges) { removeEdge(input, toRemove); } - for (Map<String, ?> map : containingMaps) { + for (Map<Integer, ?> map : containingMaps) { map.remove(toRemove); } } - private void addTypeSerializers(String vertexName, StreamRecordSerializer<?> in1, + private void addTypeSerializers(Integer vertexID, StreamRecordSerializer<?> in1, StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out1, StreamRecordSerializer<?> out2) { - typeSerializersIn1.put(vertexName, in1); - typeSerializersIn2.put(vertexName, in2); - typeSerializersOut1.put(vertexName, out1); - typeSerializersOut2.put(vertexName, out2); + typeSerializersIn1.put(vertexID, in1); + typeSerializersIn2.put(vertexID, in2); + typeSerializersOut1.put(vertexID, out1); + typeSerializersOut2.put(vertexID, out2); } /** * Sets the number of parallel instances created for the given vertex. * - * @param vertexName - * Name of the vertex + * @param vertexID + * ID of the vertex * @param parallelism * Number of parallel instances created */ - public void setParallelism(String vertexName, int parallelism) { - operatorParallelisms.put(vertexName, parallelism); + public void setParallelism(Integer vertexID, int parallelism) { + operatorParallelisms.put(vertexID, parallelism); } - public int getParallelism(String vertexName) { - return operatorParallelisms.get(vertexName); + public int getParallelism(Integer vertexID) { + return operatorParallelisms.get(vertexID); } /** * Sets the input format for the given vertex. * - * @param vertexName + * @param vertexID * Name of the vertex * @param inputFormat * input format of the file source associated with the given * vertex */ - public void setInputFormat(String vertexName, InputFormat<String, ?> inputFormat) { - inputFormatLists.put(vertexName, inputFormat); + public void setInputFormat(Integer vertexID, InputFormat<String, ?> inputFormat) { + inputFormatLists.put(vertexID, inputFormat); } - public void setBufferTimeout(String vertexName, long bufferTimeout) { - this.bufferTimeouts.put(vertexName, bufferTimeout); + public void setBufferTimeout(Integer vertexID, long bufferTimeout) { + this.bufferTimeouts.put(vertexID, bufferTimeout); } - public long getBufferTimeout(String vertexName) { - return this.bufferTimeouts.get(vertexName); + public long getBufferTimeout(Integer vertexID) { + return this.bufferTimeouts.get(vertexID); } - public void addOperatorState(String veretxName, String stateName, OperatorState<?> state) { + public void addOperatorState(Integer veretxName, String stateName, OperatorState<?> state) { Map<String, OperatorState<?>> states = operatorStates.get(veretxName); if (states == null) { states = new HashMap<String, OperatorState<?>>(); @@ -440,52 +439,52 @@ public class StreamGraph extends StreamingPlan { * Sets a user defined {@link OutputSelector} for the given operator. Used * for directed emits. * - * @param vertexName + * @param vertexID * Name of the vertex for which the output selector will be set * @param outputSelector * The user defined output selector. */ - public <T> void setOutputSelector(String vertexName, OutputSelector<T> outputSelector) { - outputSelectors.get(vertexName).add(outputSelector); + public <T> void setOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) { + outputSelectors.get(vertexID).add(outputSelector); if (LOG.isDebugEnabled()) { - LOG.debug("Outputselector set for {}", vertexName); + LOG.debug("Outputselector set for {}", vertexID); } } - public <IN, OUT> void setInvokable(String vertexName, StreamInvokable<IN, OUT> invokableObject) { - invokableObjects.put(vertexName, invokableObject); + public <IN, OUT> void setInvokable(Integer vertexID, StreamInvokable<IN, OUT> invokableObject) { + invokableObjects.put(vertexID, invokableObject); } - public <OUT> void setOutType(String id, TypeInformation<OUT> outType) { + public <OUT> void setOutType(Integer id, TypeInformation<OUT> outType) { StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType, executionConfig); typeSerializersOut1.put(id, serializer); } - public StreamInvokable<?, ?> getInvokable(String vertexName) { - return invokableObjects.get(vertexName); + public StreamInvokable<?, ?> getInvokable(Integer vertexID) { + return invokableObjects.get(vertexID); } @SuppressWarnings("unchecked") - public <OUT> StreamRecordSerializer<OUT> getOutSerializer1(String vertexName) { - return (StreamRecordSerializer<OUT>) typeSerializersOut1.get(vertexName); + public <OUT> StreamRecordSerializer<OUT> getOutSerializer1(Integer vertexID) { + return (StreamRecordSerializer<OUT>) typeSerializersOut1.get(vertexID); } @SuppressWarnings("unchecked") - public <OUT> StreamRecordSerializer<OUT> getOutSerializer2(String vertexName) { - return (StreamRecordSerializer<OUT>) typeSerializersOut2.get(vertexName); + public <OUT> StreamRecordSerializer<OUT> getOutSerializer2(Integer vertexID) { + return (StreamRecordSerializer<OUT>) typeSerializersOut2.get(vertexID); } @SuppressWarnings("unchecked") - public <IN> StreamRecordSerializer<IN> getInSerializer1(String vertexName) { - return (StreamRecordSerializer<IN>) typeSerializersIn1.get(vertexName); + public <IN> StreamRecordSerializer<IN> getInSerializer1(Integer vertexID) { + return (StreamRecordSerializer<IN>) typeSerializersIn1.get(vertexID); } @SuppressWarnings("unchecked") - public <IN> StreamRecordSerializer<IN> getInSerializer2(String vertexName) { - return (StreamRecordSerializer<IN>) typeSerializersIn2.get(vertexName); + public <IN> StreamRecordSerializer<IN> getInSerializer2(Integer vertexID) { + return (StreamRecordSerializer<IN>) typeSerializersIn2.get(vertexID); } /** @@ -497,7 +496,7 @@ public class StreamGraph extends StreamingPlan { * @param to * to */ - public void setSerializersFrom(String from, String to) { + public void setSerializersFrom(Integer from, Integer to) { operatorNames.put(to, operatorNames.get(from)); typeSerializersIn1.put(to, typeSerializersOut1.get(from)); @@ -539,33 +538,33 @@ public class StreamGraph extends StreamingPlan { this.chaining = chaining; } - public Set<Entry<String, StreamInvokable<?, ?>>> getInvokables() { + public Set<Entry<Integer, StreamInvokable<?, ?>>> getInvokables() { return invokableObjects.entrySet(); } - public Collection<String> getSources() { + public Collection<Integer> getSources() { return sources; } - public List<String> getOutEdges(String vertexName) { - return outEdgeLists.get(vertexName); + public List<Integer> getOutEdges(Integer vertexID) { + return outEdgeLists.get(vertexID); } - public List<String> getInEdges(String vertexName) { - return inEdgeLists.get(vertexName); + public List<Integer> getInEdges(Integer vertexID) { + return inEdgeLists.get(vertexID); } - public List<Integer> getOutEdgeTypes(String vertexName) { + public List<Integer> getOutEdgeTypes(Integer vertexID) { - return outEdgeTypes.get(vertexName); + return outEdgeTypes.get(vertexID); } - public StreamPartitioner<?> getOutPartitioner(String upStreamVertex, String downStreamVertex) { + public StreamPartitioner<?> getOutPartitioner(Integer upStreamVertex, Integer downStreamVertex) { return outputPartitioners.get(upStreamVertex).get( outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex)); } - public List<String> getSelectedNames(String upStreamVertex, String downStreamVertex) { + public List<String> getSelectedNames(Integer upStreamVertex, Integer downStreamVertex) { return selectedNames.get(upStreamVertex).get( outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex)); @@ -575,40 +574,40 @@ public class StreamGraph extends StreamingPlan { return new HashSet<Integer>(iterationIds.values()); } - public String getIterationTail(int iterID) { - return iterationIDtoTailName.get(iterID); + public Integer getIterationTail(int iterID) { + return iterationIDtoTailID.get(iterID); } - public String getIterationHead(int iterID) { - return iterationIDtoHeadName.get(iterID); + public Integer getIterationHead(int iterID) { + return iterationIDtoHeadID.get(iterID); } - public Class<? extends AbstractInvokable> getJobVertexClass(String vertexName) { - return jobVertexClasses.get(vertexName); + public Class<? extends AbstractInvokable> getJobVertexClass(Integer vertexID) { + return jobVertexClasses.get(vertexID); } - public InputFormat<String, ?> getInputFormat(String vertexName) { - return inputFormatLists.get(vertexName); + public InputFormat<String, ?> getInputFormat(Integer vertexID) { + return inputFormatLists.get(vertexID); } - public List<OutputSelector<?>> getOutputSelector(String vertexName) { - return outputSelectors.get(vertexName); + public List<OutputSelector<?>> getOutputSelector(Integer vertexID) { + return outputSelectors.get(vertexID); } - public Map<String, OperatorState<?>> getState(String vertexName) { - return operatorStates.get(vertexName); + public Map<String, OperatorState<?>> getState(Integer vertexID) { + return operatorStates.get(vertexID); } - public Integer getIterationID(String vertexName) { - return iterationIds.get(vertexName); + public Integer getIterationID(Integer vertexID) { + return iterationIds.get(vertexID); } - public long getIterationTimeout(String vertexName) { - return iterationTimeouts.get(vertexName); + public long getIterationTimeout(Integer vertexID) { + return iterationTimeouts.get(vertexID); } - public String getOperatorName(String vertexName) { - return operatorNames.get(vertexName); + public String getOperatorName(Integer vertexID) { + return operatorNames.get(vertexID); } @Override @@ -621,18 +620,14 @@ public class StreamGraph extends StreamingPlan { JSONArray nodes = new JSONArray(); json.put("nodes", nodes); - List<Integer> operatorIDs = new ArrayList<Integer>(); - for (String id : operatorNames.keySet()) { - operatorIDs.add(Integer.valueOf(id)); - } + List<Integer> operatorIDs = new ArrayList<Integer>(operatorNames.keySet()); Collections.sort(operatorIDs); - for (Integer idInt : operatorIDs) { + for (Integer id : operatorIDs) { JSONObject node = new JSONObject(); nodes.put(node); - String id = idInt.toString(); - node.put("id", idInt); + node.put("id", id); node.put("type", getOperatorName(id)); if (sources.contains(id)) { @@ -658,12 +653,12 @@ public class StreamGraph extends StreamingPlan { for (int i = 0; i < numIn; i++) { - String inID = getInEdges(id).get(i); + Integer inID = getInEdges(id).get(i); JSONObject input = new JSONObject(); inputs.put(input); - input.put("id", Integer.valueOf(inID)); + input.put("id", inID); input.put("ship_strategy", getOutPartitioner(inID, id).getStrategy()); if (i == 0) { input.put("side", "first"); http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java index 9beea89..8ec8486 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java @@ -24,7 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang.StringUtils; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.AbstractJobVertex; @@ -49,24 +49,24 @@ public class StreamingJobGraphGenerator { private StreamGraph streamGraph; - private Map<String, AbstractJobVertex> streamVertices; + private Map<Integer, AbstractJobVertex> streamVertices; private JobGraph jobGraph; - private Collection<String> builtNodes; + private Collection<Integer> builtVertices; - private Map<String, Map<String, StreamConfig>> chainedConfigs; - private Map<String, StreamConfig> vertexConfigs; - private Map<String, String> chainedNames; + private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs; + private Map<Integer, StreamConfig> vertexConfigs; + private Map<Integer, String> chainedNames; public StreamingJobGraphGenerator(StreamGraph streamGraph) { this.streamGraph = streamGraph; } private void init() { - this.streamVertices = new HashMap<String, AbstractJobVertex>(); - this.builtNodes = new HashSet<String>(); - this.chainedConfigs = new HashMap<String, Map<String, StreamConfig>>(); - this.vertexConfigs = new HashMap<String, StreamConfig>(); - this.chainedNames = new HashMap<String, String>(); + this.streamVertices = new HashMap<Integer, AbstractJobVertex>(); + this.builtVertices = new HashSet<Integer>(); + this.chainedConfigs = new HashMap<Integer, Map<Integer, StreamConfig>>(); + this.vertexConfigs = new HashMap<Integer, StreamConfig>(); + this.chainedNames = new HashMap<Integer, String>(); } public JobGraph createJobGraph(String jobName) { @@ -85,20 +85,20 @@ public class StreamingJobGraphGenerator { } private void setChaining() { - for (String sourceName : streamGraph.getSources()) { + for (Integer sourceName : streamGraph.getSources()) { createChain(sourceName, sourceName); } } - private List<Tuple2<String, String>> createChain(String startNode, String current) { + private List<Tuple2<Integer, Integer>> createChain(Integer startNode, Integer current) { - if (!builtNodes.contains(startNode)) { + if (!builtVertices.contains(startNode)) { - List<Tuple2<String, String>> transitiveOutEdges = new ArrayList<Tuple2<String, String>>(); - List<String> chainableOutputs = new ArrayList<String>(); - List<String> nonChainableOutputs = new ArrayList<String>(); + List<Tuple2<Integer, Integer>> transitiveOutEdges = new ArrayList<Tuple2<Integer, Integer>>(); + List<Integer> chainableOutputs = new ArrayList<Integer>(); + List<Integer> nonChainableOutputs = new ArrayList<Integer>(); - for (String outName : streamGraph.getOutEdges(current)) { + for (Integer outName : streamGraph.getOutEdges(current)) { if (isChainable(current, outName)) { chainableOutputs.add(outName); } else { @@ -106,12 +106,12 @@ public class StreamingJobGraphGenerator { } } - for (String chainable : chainableOutputs) { + for (Integer chainable : chainableOutputs) { transitiveOutEdges.addAll(createChain(startNode, chainable)); } - for (String nonChainable : nonChainableOutputs) { - transitiveOutEdges.add(new Tuple2<String, String>(current, nonChainable)); + for (Integer nonChainable : nonChainableOutputs) { + transitiveOutEdges.add(new Tuple2<Integer, Integer>(current, nonChainable)); createChain(nonChainable, nonChainable); } @@ -127,7 +127,7 @@ public class StreamingJobGraphGenerator { config.setChainStart(); config.setOutEdgesInOrder(transitiveOutEdges); - for (Tuple2<String, String> edge : transitiveOutEdges) { + for (Tuple2<Integer, Integer> edge : transitiveOutEdges) { connect(startNode, edge); } @@ -135,10 +135,10 @@ public class StreamingJobGraphGenerator { } else { - Map<String, StreamConfig> chainedConfs = chainedConfigs.get(startNode); + Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNode); if (chainedConfs == null) { - chainedConfigs.put(startNode, new HashMap<String, StreamConfig>()); + chainedConfigs.put(startNode, new HashMap<Integer, StreamConfig>()); } chainedConfigs.get(startNode).put(current, config); } @@ -146,113 +146,113 @@ public class StreamingJobGraphGenerator { return transitiveOutEdges; } else { - return new ArrayList<Tuple2<String, String>>(); + return new ArrayList<Tuple2<Integer, Integer>>(); } } - private String createChainedName(String vertexID, List<String> chainedOutputs) { - String vertexName = streamGraph.getOperatorName(vertexID); + private String createChainedName(Integer vertexID, List<Integer> chainedOutputs) { + String operatorName = streamGraph.getOperatorName(vertexID); if (chainedOutputs.size() > 1) { List<String> outputChainedNames = new ArrayList<String>(); - for (String chainable : chainedOutputs) { + for (Integer chainable : chainedOutputs) { outputChainedNames.add(chainedNames.get(chainable)); } - return vertexName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")"; + return operatorName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")"; } else if (chainedOutputs.size() == 1) { - return vertexName + " -> " + chainedNames.get(chainedOutputs.get(0)); + return operatorName + " -> " + chainedNames.get(chainedOutputs.get(0)); } else { - return vertexName; + return operatorName; } } - private StreamConfig createProcessingVertex(String vertexName) { + private StreamConfig createProcessingVertex(Integer vertexID) { - AbstractJobVertex vertex = new AbstractJobVertex(chainedNames.get(vertexName)); + AbstractJobVertex vertex = new AbstractJobVertex(chainedNames.get(vertexID)); - vertex.setInvokableClass(streamGraph.getJobVertexClass(vertexName)); - if (streamGraph.getParallelism(vertexName) > 0) { - vertex.setParallelism(streamGraph.getParallelism(vertexName)); + vertex.setInvokableClass(streamGraph.getJobVertexClass(vertexID)); + if (streamGraph.getParallelism(vertexID) > 0) { + vertex.setParallelism(streamGraph.getParallelism(vertexID)); } if (LOG.isDebugEnabled()) { - LOG.debug("Parallelism set: {} for {}", streamGraph.getParallelism(vertexName), - vertexName); + LOG.debug("Parallelism set: {} for {}", streamGraph.getParallelism(vertexID), + vertexID); } - if (streamGraph.getInputFormat(vertexName) != null) { - vertex.setInputSplitSource(streamGraph.getInputFormat(vertexName)); + if (streamGraph.getInputFormat(vertexID) != null) { + vertex.setInputSplitSource(streamGraph.getInputFormat(vertexID)); } - streamVertices.put(vertexName, vertex); - builtNodes.add(vertexName); + streamVertices.put(vertexID, vertex); + builtVertices.add(vertexID); jobGraph.addVertex(vertex); return new StreamConfig(vertex.getConfiguration()); } - private void setVertexConfig(String vertexName, StreamConfig config, - List<String> chainableOutputs, List<String> nonChainableOutputs) { + private void setVertexConfig(Integer vertexID, StreamConfig config, + List<Integer> chainableOutputs, List<Integer> nonChainableOutputs) { - config.setVertexName(vertexName); - config.setBufferTimeout(streamGraph.getBufferTimeout(vertexName)); + config.setVertexID(vertexID); + config.setBufferTimeout(streamGraph.getBufferTimeout(vertexID)); - config.setTypeSerializerIn1(streamGraph.getInSerializer1(vertexName)); - config.setTypeSerializerIn2(streamGraph.getInSerializer2(vertexName)); - config.setTypeSerializerOut1(streamGraph.getOutSerializer1(vertexName)); - config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexName)); + config.setTypeSerializerIn1(streamGraph.getInSerializer1(vertexID)); + config.setTypeSerializerIn2(streamGraph.getInSerializer2(vertexID)); + config.setTypeSerializerOut1(streamGraph.getOutSerializer1(vertexID)); + config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexID)); - config.setUserInvokable(streamGraph.getInvokable(vertexName)); - config.setOutputSelectors(streamGraph.getOutputSelector(vertexName)); - config.setOperatorStates(streamGraph.getState(vertexName)); + config.setUserInvokable(streamGraph.getInvokable(vertexID)); + config.setOutputSelectors(streamGraph.getOutputSelector(vertexID)); + config.setOperatorStates(streamGraph.getState(vertexID)); config.setNumberOfOutputs(nonChainableOutputs.size()); config.setOutputs(nonChainableOutputs); config.setChainedOutputs(chainableOutputs); - Class<? extends AbstractInvokable> vertexClass = streamGraph.getJobVertexClass(vertexName); + Class<? extends AbstractInvokable> vertexClass = streamGraph.getJobVertexClass(vertexID); if (vertexClass.equals(StreamIterationHead.class) || vertexClass.equals(StreamIterationTail.class)) { - config.setIterationId(streamGraph.getIterationID(vertexName)); - config.setIterationWaitTime(streamGraph.getIterationTimeout(vertexName)); + config.setIterationId(streamGraph.getIterationID(vertexID)); + config.setIterationWaitTime(streamGraph.getIterationTimeout(vertexID)); } - List<String> allOutputs = new ArrayList<String>(chainableOutputs); + List<Integer> allOutputs = new ArrayList<Integer>(chainableOutputs); allOutputs.addAll(nonChainableOutputs); - for (String output : allOutputs) { - config.setSelectedNames(output, streamGraph.getSelectedNames(vertexName, output)); + for (Integer output : allOutputs) { + config.setSelectedNames(output, streamGraph.getSelectedNames(vertexID, output)); } - vertexConfigs.put(vertexName, config); + vertexConfigs.put(vertexID, config); } - private <T> void connect(String headOfChain, Tuple2<String, String> edge) { + private <T> void connect(Integer headOfChain, Tuple2<Integer, Integer> edge) { - String upStreamVertexName = edge.f0; - String downStreamVertexName = edge.f1; + Integer upStreamvertexID = edge.f0; + Integer downStreamvertexID = edge.f1; - int outputIndex = streamGraph.getOutEdges(upStreamVertexName).indexOf(downStreamVertexName); + int outputIndex = streamGraph.getOutEdges(upStreamvertexID).indexOf(downStreamvertexID); AbstractJobVertex headVertex = streamVertices.get(headOfChain); - AbstractJobVertex downStreamVertex = streamVertices.get(downStreamVertexName); + AbstractJobVertex downStreamVertex = streamVertices.get(downStreamvertexID); StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration()); - StreamConfig upStreamConfig = headOfChain == upStreamVertexName ? new StreamConfig( + StreamConfig upStreamConfig = headOfChain == upStreamvertexID ? new StreamConfig( headVertex.getConfiguration()) : chainedConfigs.get(headOfChain).get( - upStreamVertexName); + upStreamvertexID); - List<Integer> outEdgeIndexList = streamGraph.getOutEdgeTypes(upStreamVertexName); + List<Integer> outEdgeIndexList = streamGraph.getOutEdgeTypes(upStreamvertexID); int numOfInputs = downStreamConfig.getNumberOfInputs(); downStreamConfig.setInputIndex(numOfInputs++, outEdgeIndexList.get(outputIndex)); downStreamConfig.setNumberOfInputs(numOfInputs); - StreamPartitioner<?> partitioner = streamGraph.getOutPartitioner(upStreamVertexName, - downStreamVertexName); + StreamPartitioner<?> partitioner = streamGraph.getOutPartitioner(upStreamvertexID, + downStreamvertexID); - upStreamConfig.setPartitioner(downStreamVertexName, partitioner); + upStreamConfig.setPartitioner(downStreamvertexID, partitioner); if (partitioner.getStrategy() == PartitioningStrategy.FORWARD) { downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE); @@ -262,13 +262,13 @@ public class StreamingJobGraphGenerator { if (LOG.isDebugEnabled()) { LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(), - headOfChain, downStreamVertexName); + headOfChain, downStreamvertexID); } } - private boolean isChainable(String vertexName, String outName) { + private boolean isChainable(Integer vertexID, Integer outName) { - StreamInvokable<?, ?> headInvokable = streamGraph.getInvokable(vertexName); + StreamInvokable<?, ?> headInvokable = streamGraph.getInvokable(vertexID); StreamInvokable<?, ?> outInvokable = streamGraph.getInvokable(outName); return streamGraph.getInEdges(outName).size() == 1 @@ -276,8 +276,8 @@ public class StreamingJobGraphGenerator { && outInvokable.getChainingStrategy() == ChainingStrategy.ALWAYS && (headInvokable.getChainingStrategy() == ChainingStrategy.HEAD || headInvokable .getChainingStrategy() == ChainingStrategy.ALWAYS) - && streamGraph.getOutPartitioner(vertexName, outName).getStrategy() == PartitioningStrategy.FORWARD - && streamGraph.getParallelism(vertexName) == streamGraph.getParallelism(outName) + && streamGraph.getOutPartitioner(vertexID, outName).getStrategy() == PartitioningStrategy.FORWARD + && streamGraph.getParallelism(vertexID) == streamGraph.getParallelism(outName) && streamGraph.chaining; } http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimzier.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimzier.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimzier.java index f0fa772..62fd5cf 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimzier.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/WindowingOptimzier.java @@ -30,13 +30,13 @@ import org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscret public class WindowingOptimzier { public static void optimizeGraph(StreamGraph streamGraph) { - Set<Entry<String, StreamInvokable<?, ?>>> invokables = streamGraph.getInvokables(); - List<Tuple2<String, StreamDiscretizer<?>>> discretizers = new ArrayList<Tuple2<String, StreamDiscretizer<?>>>(); + Set<Entry<Integer, StreamInvokable<?, ?>>> invokables = streamGraph.getInvokables(); + List<Tuple2<Integer, StreamDiscretizer<?>>> discretizers = new ArrayList<Tuple2<Integer, StreamDiscretizer<?>>>(); // Get the discretizers - for (Entry<String, StreamInvokable<?, ?>> entry : invokables) { + for (Entry<Integer, StreamInvokable<?, ?>> entry : invokables) { if (entry.getValue() instanceof StreamDiscretizer) { - discretizers.add(new Tuple2<String, StreamDiscretizer<?>>(entry.getKey(), + discretizers.add(new Tuple2<Integer, StreamDiscretizer<?>>(entry.getKey(), (StreamDiscretizer<?>) entry.getValue())); } } @@ -46,15 +46,15 @@ public class WindowingOptimzier { } private static void setDiscretizerReuse(StreamGraph streamGraph, - List<Tuple2<String, StreamDiscretizer<?>>> discretizers) { - List<Tuple2<StreamDiscretizer<?>, List<String>>> matchingDiscretizers = new ArrayList<Tuple2<StreamDiscretizer<?>, List<String>>>(); + List<Tuple2<Integer, StreamDiscretizer<?>>> discretizers) { + List<Tuple2<StreamDiscretizer<?>, List<Integer>>> matchingDiscretizers = new ArrayList<Tuple2<StreamDiscretizer<?>, List<Integer>>>(); - for (Tuple2<String, StreamDiscretizer<?>> discretizer : discretizers) { + for (Tuple2<Integer, StreamDiscretizer<?>> discretizer : discretizers) { boolean inMatching = false; - for (Tuple2<StreamDiscretizer<?>, List<String>> matching : matchingDiscretizers) { - Set<String> discretizerInEdges = new HashSet<String>( + for (Tuple2<StreamDiscretizer<?>, List<Integer>> matching : matchingDiscretizers) { + Set<Integer> discretizerInEdges = new HashSet<Integer>( streamGraph.getInEdges(discretizer.f0)); - Set<String> matchingInEdges = new HashSet<String>( + Set<Integer> matchingInEdges = new HashSet<Integer>( streamGraph.getInEdges(matching.f1.get(0))); if (discretizer.f1.equals(matching.f0) @@ -65,17 +65,17 @@ public class WindowingOptimzier { } } if (!inMatching) { - List<String> matchingNames = new ArrayList<String>(); + List<Integer> matchingNames = new ArrayList<Integer>(); matchingNames.add(discretizer.f0); - matchingDiscretizers.add(new Tuple2<StreamDiscretizer<?>, List<String>>( + matchingDiscretizers.add(new Tuple2<StreamDiscretizer<?>, List<Integer>>( discretizer.f1, matchingNames)); } } - for (Tuple2<StreamDiscretizer<?>, List<String>> matching : matchingDiscretizers) { - List<String> matchList = matching.f1; + for (Tuple2<StreamDiscretizer<?>, List<Integer>> matching : matchingDiscretizers) { + List<Integer> matchList = matching.f1; if (matchList.size() > 1) { - String first = matchList.get(0); + Integer first = matchList.get(0); for (int i = 1; i < matchList.size(); i++) { replaceDiscretizer(streamGraph, matchList.get(i), first); } @@ -83,26 +83,26 @@ public class WindowingOptimzier { } } - private static void replaceDiscretizer(StreamGraph streamGraph, String toReplace, - String replaceWith) { + private static void replaceDiscretizer(StreamGraph streamGraph, Integer toReplace, + Integer replaceWith) { // Convert to array to create a copy - List<String> outEdges = new ArrayList<String>(streamGraph.getOutEdges(toReplace)); + List<Integer> outEdges = new ArrayList<Integer>(streamGraph.getOutEdges(toReplace)); int numOutputs = outEdges.size(); // Reconnect outputs for (int i = 0; i < numOutputs; i++) { - String outName = outEdges.get(i); + Integer output = outEdges.get(i); - streamGraph.setEdge(replaceWith, outName, - streamGraph.getOutPartitioner(toReplace, outName), 0, new ArrayList<String>()); - streamGraph.removeEdge(toReplace, outName); + streamGraph.setEdge(replaceWith, output, + streamGraph.getOutPartitioner(toReplace, output), 0, new ArrayList<String>()); + streamGraph.removeEdge(toReplace, output); } - List<String> inEdges = new ArrayList<String>(streamGraph.getInEdges(toReplace)); + List<Integer> inEdges = new ArrayList<Integer>(streamGraph.getInEdges(toReplace)); // Remove inputs - for (String inName : inEdges) { - streamGraph.removeEdge(inName, toReplace); + for (Integer input : inEdges) { + streamGraph.removeEdge(input, toReplace); } streamGraph.removeVertex(toReplace); http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index f221e4b..46b71a5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -95,7 +95,7 @@ public class DataStream<OUT> { protected static Integer counter = 0; protected final StreamExecutionEnvironment environment; - protected final String id; + protected final Integer id; protected final String type; protected int degreeOfParallelism; protected List<String> userDefinedNames; @@ -124,7 +124,7 @@ public class DataStream<OUT> { } counter++; - this.id = counter.toString(); + this.id = counter; this.type = operatorType; this.environment = environment; this.degreeOfParallelism = environment.getDegreeOfParallelism(); @@ -166,7 +166,7 @@ public class DataStream<OUT> { * * @return ID of the DataStream */ - public String getId() { + public Integer getId() { return id; } @@ -1167,7 +1167,7 @@ public class DataStream<OUT> { * @param typeNumber * Number of the type (used at co-functions) */ - protected <X> void connectGraph(DataStream<X> inputStream, String outputID, int typeNumber) { + protected <X> void connectGraph(DataStream<X> inputStream, Integer outputID, int typeNumber) { for (DataStream<X> stream : inputStream.mergedStreams) { streamGraph.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber, inputStream.userDefinedNames); @@ -1260,9 +1260,9 @@ public class DataStream<OUT> { } } - private void validateMerge(String id) { + private void validateMerge(Integer id) { for (DataStream<OUT> ds : this.mergedStreams) { - if (ds.getId().equals(id)) { + if (ds.getId() == id) { throw new RuntimeException("A DataStream cannot be merged with itself"); } } http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java index 7df91f0..dc2e987 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java @@ -49,13 +49,16 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> { private SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream; private WindowTransformation transformation; + protected boolean isPartitioned = false; protected DiscretizedStream(SingleOutputStreamOperator<StreamWindow<OUT>, ?> discretizedStream, - KeySelector<OUT, ?> groupByKey, WindowTransformation tranformation) { + KeySelector<OUT, ?> groupByKey, WindowTransformation tranformation, + boolean isPartitioned) { super(); this.groupByKey = groupByKey; this.discretizedStream = discretizedStream; this.transformation = tranformation; + this.isPartitioned = isPartitioned; } public DataStream<OUT> flatten() { @@ -73,6 +76,8 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> { WindowTransformation.REDUCEWINDOW, "Window Reduce", getType(), new WindowReducer<OUT>(reduceFunction)).merge(); + // If we merged a non-grouped reduce transformation we need to reduce + // again if (!isGrouped() && out.discretizedStream.invokable instanceof WindowMerger) { return out.transform(WindowTransformation.REDUCEWINDOW, "Window Reduce", out.getType(), new WindowReducer<OUT>(discretizedStream.clean(reduceFunction))); @@ -93,14 +98,10 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> { public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction, TypeInformation<R> returnType) { DiscretizedStream<R> out = partition(transformation).transform( - WindowTransformation.REDUCEWINDOW, "Window Map", returnType, - new WindowMapper<OUT, R>(discretizedStream.clean(windowMapFunction))); + WindowTransformation.MAPWINDOW, "Window Map", returnType, + new WindowMapper<OUT, R>(discretizedStream.clean(windowMapFunction))).merge(); - if (isGrouped()) { - return out.merge(); - } else { - return out; - } + return out; } private <R> DiscretizedStream<R> transform(WindowTransformation transformation, @@ -120,29 +121,36 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> { new WindowPartitioner<OUT>(groupByKey)).setParallelism(parallelism); out.groupByKey = null; + out.isPartitioned = true; return out; } else if (transformation != WindowTransformation.MAPWINDOW && parallelism != discretizedStream.getExecutionEnvironment() .getDegreeOfParallelism()) { - return transform(transformation, "Window partitioner", getType(), + DiscretizedStream<OUT> out = transform(transformation, "Window partitioner", getType(), new WindowPartitioner<OUT>(parallelism)).setParallelism(parallelism); + + out.isPartitioned = true; + + return out; } else { + this.isPartitioned = false; return this; } } private DiscretizedStream<OUT> setParallelism(int parallelism) { - return wrap(discretizedStream.setParallelism(parallelism)); + return wrap(discretizedStream.setParallelism(parallelism), isPartitioned); } private DiscretizedStream<OUT> merge() { TypeInformation<StreamWindow<OUT>> type = discretizedStream.getType(); // Only merge partitioned streams - if (discretizedStream.invokable instanceof WindowPartitioner) { - return wrap(discretizedStream.groupBy(new WindowKey<OUT>()).transform("Window Merger", - type, new WindowMerger<OUT>())); + if (isPartitioned) { + return wrap( + discretizedStream.groupBy(new WindowKey<OUT>()).transform("Window Merger", + type, new WindowMerger<OUT>()), false); } else { return this; } @@ -150,14 +158,15 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> { } @SuppressWarnings("rawtypes") - private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream) { + private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream, boolean isPartitioned) { return wrap(stream, transformation); } @SuppressWarnings({ "unchecked", "rawtypes" }) private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream, WindowTransformation transformation) { - return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey, transformation); + return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey, + transformation, isPartitioned); } @SuppressWarnings("rawtypes") @@ -220,7 +229,8 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> { } protected DiscretizedStream<OUT> copy() { - return new DiscretizedStream<OUT>(discretizedStream.copy(), groupByKey, transformation); + return new DiscretizedStream<OUT>(discretizedStream.copy(), groupByKey, transformation, + isPartitioned); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java index 36551d2..35b0324 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java @@ -268,10 +268,12 @@ public class WindowedDataStream<OUT> { WindowBuffer<OUT> windowBuffer = getWindowBuffer(transformation, getTrigger(), getEviction(), discretizerKey); + DiscretizedStream<OUT> discretized = discretize(transformation, windowBuffer); + if (windowBuffer instanceof CompletePreAggregator) { - return discretize(transformation, windowBuffer); + return discretized; } else { - return discretize(transformation, windowBuffer).reduceWindow(reduceFunction); + return discretized.reduceWindow(reduceFunction); } } @@ -337,7 +339,8 @@ public class WindowedDataStream<OUT> { .transform("Stream Discretizer", bufferEventType, discretizer) .setParallelism(parallelism) .transform("WindowBuffer", new StreamWindowTypeInfo<OUT>(getType()), - bufferInvokable).setParallelism(parallelism), groupByKey, transformation); + bufferInvokable).setParallelism(parallelism), groupByKey, transformation, + false); } http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java index 1a12cb2..911f060 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java @@ -51,9 +51,9 @@ public class OutputHandler<OUT> { public List<ChainableInvokable<?, ?>> chainedInvokables; - private Map<String, StreamOutput<?>> outputMap; - private Map<String, StreamConfig> chainedConfigs; - private List<Tuple2<String, String>> outEdgesInOrder; + private Map<Integer, StreamOutput<?>> outputMap; + private Map<Integer, StreamConfig> chainedConfigs; + private List<Tuple2<Integer, Integer>> outEdgesInOrder; public OutputHandler(StreamVertex<?, OUT> vertex) { @@ -61,19 +61,19 @@ public class OutputHandler<OUT> { this.vertex = vertex; this.configuration = new StreamConfig(vertex.getTaskConfiguration()); this.chainedInvokables = new ArrayList<ChainableInvokable<?, ?>>(); - this.outputMap = new HashMap<String, StreamOutput<?>>(); + this.outputMap = new HashMap<Integer, StreamOutput<?>>(); this.cl = vertex.getUserCodeClassLoader(); // We read the chained configs, and the order of record writer // registrations by outputname this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(cl); - this.chainedConfigs.put(configuration.getTaskName(), configuration); + this.chainedConfigs.put(configuration.getVertexID(), configuration); this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl); // We iterate through all the out edges from this job vertex and create // a stream output - for (Tuple2<String, String> outEdge : outEdgesInOrder) { + for (Tuple2<Integer, Integer> outEdge : outEdgesInOrder) { StreamOutput<?> streamOutput = createStreamOutput(outEdge.f1, chainedConfigs.get(outEdge.f0), outEdgesInOrder.indexOf(outEdge)); outputMap.put(outEdge.f1, streamOutput); @@ -111,7 +111,7 @@ public class OutputHandler<OUT> { chainedTaskConfig.getOutputSelectors(cl)) : new CollectorWrapper<OUT>(); // Create collectors for the network outputs - for (String output : chainedTaskConfig.getOutputs(cl)) { + for (Integer output : chainedTaskConfig.getOutputs(cl)) { Collector<?> outCollector = outputMap.get(output); @@ -124,7 +124,7 @@ public class OutputHandler<OUT> { } // Create collectors for the chained outputs - for (String output : chainedTaskConfig.getChainedOutputs(cl)) { + for (Integer output : chainedTaskConfig.getChainedOutputs(cl)) { Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output)); if (isDirectEmit) { ((DirectedCollectorWrapper<OUT>) wrapper).addCollector(outCollector, @@ -157,7 +157,7 @@ public class OutputHandler<OUT> { } /** - * We create the StreamOutput for the specific output given by the name, and + * We create the StreamOutput for the specific output given by the id, and * the configuration of its source task * * @param outputVertex @@ -166,7 +166,7 @@ public class OutputHandler<OUT> { * The config of upStream task * @return */ - private <T> StreamOutput<T> createStreamOutput(String outputVertex, StreamConfig configuration, + private <T> StreamOutput<T> createStreamOutput(Integer outputVertex, StreamConfig configuration, int outputIndex) { StreamRecordSerializer<T> outSerializer = configuration http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java index 78cbbe5..38bba5e 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java @@ -94,7 +94,7 @@ public class DirectedOutputTest { @Test public void outputSelectorTest() throws Exception { - StreamExecutionEnvironment env = new TestStreamEnvironment(1, 128); + StreamExecutionEnvironment env = new TestStreamEnvironment(1, 32); SplitDataStream<Long> source = env.generateSequence(1, 11).split(new MyOutputSelector()); source.select(EVEN).addSink(new ListSink(EVEN)); http://git-wip-us.apache.org/repos/asf/flink/blob/a911559a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java index f111898..3163c46 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java @@ -122,6 +122,9 @@ public class WindowIntegrationTest implements Serializable { source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream() .addSink(new DistributedSink3()); + source.window(Count.of(5)).mapWindow(new IdentityWindowMap()).flatten() + .addSink(new DistributedSink4()); + env.execute(); // sum ( Count of 2 slide 3 ) @@ -180,6 +183,14 @@ public class WindowIntegrationTest implements Serializable { expected6.add(StreamWindow.fromElements(4)); expected6.add(StreamWindow.fromElements(10)); + validateOutput(expected6, DistributedSink3.windows); + + List<StreamWindow<Integer>> expected7 = new ArrayList<StreamWindow<Integer>>(); + expected7.add(StreamWindow.fromElements(1, 2, 2, 3, 4)); + expected7.add(StreamWindow.fromElements(5, 10, 11, 11)); + + validateOutput(expected7, DistributedSink4.windows); + } public static <R> void validateOutput(List<R> expected, List<R> actual) { @@ -263,4 +274,17 @@ public class WindowIntegrationTest implements Serializable { } } + + @SuppressWarnings("serial") + private static class DistributedSink4 implements SinkFunction<StreamWindow<Integer>> { + + public static List<StreamWindow<Integer>> windows = Collections + .synchronizedList(new ArrayList<StreamWindow<Integer>>()); + + @Override + public void invoke(StreamWindow<Integer> value) throws Exception { + windows.add(value); + } + + } }
