[hotfix] Make DataStream property methods properly Scalaesk This also includes some minor cleanups
This closes #1689 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3c6646e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3c6646e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3c6646e Branch: refs/heads/release-1.0 Commit: f3c6646e68750a068b3325181b8a16a4689a0fed Parents: df19a8b Author: Stephan Ewen <[email protected]> Authored: Mon Feb 22 18:37:59 2016 +0100 Committer: Robert Metzger <[email protected]> Committed: Fri Feb 26 20:56:24 2016 +0100 ---------------------------------------------------------------------- .../streaming/api/datastream/DataStream.java | 2 +- .../flink/streaming/api/graph/StreamNode.java | 9 +- .../api/graph/StreamGraphGeneratorTest.java | 7 +- .../flink/streaming/api/scala/DataStream.scala | 108 ++++++++++++++----- 4 files changed, 91 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f3c6646e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index ddba7d6..1cd8ade 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -128,7 +128,7 @@ public class DataStream<T> { * @return ID of the DataStream */ @Internal - public Integer getId() { + public int getId() { return transformation.getId(); } http://git-wip-us.apache.org/repos/asf/flink/blob/f3c6646e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java index 8605ce0..533f1e1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java @@ -40,7 +40,7 @@ public class StreamNode implements Serializable { transient private StreamExecutionEnvironment env; - private final Integer id; + private final int id; private Integer parallelism = null; private Long bufferTimeout = null; private final String operatorName; @@ -124,7 +124,7 @@ public class StreamNode implements Serializable { return inEdgeIndices; } - public Integer getId() { + public int getId() { return id; } @@ -264,12 +264,11 @@ public class StreamNode implements Serializable { } StreamNode that = (StreamNode) o; - - return id.equals(that.id); + return id == that.id; } @Override public int hashCode() { - return id.hashCode(); + return id; } } http://git-wip-us.apache.org/repos/asf/flink/blob/f3c6646e/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java index 734199b..d1f92c6 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.streaming.api.graph; import org.apache.flink.api.common.ExecutionConfig; @@ -39,7 +40,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.streaming.util.EvenOddOutputSelector; import org.apache.flink.streaming.util.NoOpIntMap; import org.apache.flink.streaming.util.NoOpSink; -import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; + import org.junit.Test; import static org.junit.Assert.assertEquals; http://git-wip-us.apache.org/repos/asf/flink/blob/f3c6646e/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala ---------------------------------------------------------------------- diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index 3522b51..04a8a5f 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -18,7 +18,8 @@ package org.apache.flink.streaming.api.scala -import org.apache.flink.annotation.{PublicEvolving, Public} +import org.apache.flink.annotation.{Internal, PublicEvolving, Public} +import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner} import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.api.common.typeinfo.TypeInformation @@ -43,30 +44,79 @@ import scala.collection.JavaConverters._ class DataStream[T](stream: JavaStream[T]) { /** - * Gets the underlying java DataStream object. - */ - def javaStream: JavaStream[T] = stream - - /** * Returns the [[StreamExecutionEnvironment]] associated with the current [[DataStream]]. * - * @return associated execution environment + * @return associated execution environment + * @deprecated Use [[executionEnvironment]] instead */ + @deprecated + @PublicEvolving def getExecutionEnvironment: StreamExecutionEnvironment = new StreamExecutionEnvironment(stream.getExecutionEnvironment) /** - * Returns the ID of the DataStream. - * - * @return ID of the DataStream + * Returns the TypeInformation for the elements of this DataStream. + * + * @deprecated Use [[dataType]] instead. */ + @deprecated @PublicEvolving - def getId = stream.getId + def getType(): TypeInformation[T] = stream.getType() /** + * Returns the parallelism of this operation. + * + * @deprecated Use [[parallelism]] instead. + */ + @deprecated + @PublicEvolving + def getParallelism = stream.getParallelism + + /** + * Returns the execution config. + * + * @deprecated Use [[executionConfig]] instead. + */ + @deprecated + @PublicEvolving + def getExecutionConfig = stream.getExecutionConfig + + /** + * Returns the ID of the DataStream. + */ + @Internal + private[flink] def getId = stream.getId() + + // -------------------------------------------------------------------------- + // Scalaesk accessors + // -------------------------------------------------------------------------- + + /** + * Gets the underlying java DataStream object. + */ + def javaStream: JavaStream[T] = stream + + /** * Returns the TypeInformation for the elements of this DataStream. */ - def getType(): TypeInformation[T] = stream.getType() + def dataType: TypeInformation[T] = stream.getType() + + /** + * Returns the execution config. + */ + def executionConfig: ExecutionConfig = stream.getExecutionConfig() + + /** + * Returns the [[StreamExecutionEnvironment]] associated with this data stream + */ + def executionEnvironment: StreamExecutionEnvironment = + new StreamExecutionEnvironment(stream.getExecutionEnvironment()) + + + /** + * Returns the parallelism of this operation. + */ + def parallelism: Int = stream.getParallelism() /** * Sets the parallelism of this operation. This must be at least 1. @@ -75,34 +125,36 @@ class DataStream[T](stream: JavaStream[T]) { stream match { case ds: SingleOutputStreamOperator[_, _] => ds.setParallelism(parallelism) case _ => - throw new UnsupportedOperationException("Operator " + stream.toString + " cannot " + - "have " + - "parallelism.") + throw new UnsupportedOperationException( + "Operator " + stream + " cannot set the parallelism.") } this } /** - * Returns the parallelism of this operation. - */ - def getParallelism = stream.getParallelism - - /** - * Returns the execution config. - */ - def getExecutionConfig = stream.getExecutionConfig - - /** * Gets the name of the current data stream. This name is * used by the visualization and logging during runtime. * * @return Name of the stream. */ - def getName : String = stream match { + def name: String = stream match { case stream : SingleOutputStreamOperator[T,_] => stream.getName case _ => throw new UnsupportedOperationException("Only supported for operators.") } + + // -------------------------------------------------------------------------- + + /** + * Gets the name of the current data stream. This name is + * used by the visualization and logging during runtime. + * + * @return Name of the stream. + * @deprecated Use [[name]] instead + */ + @deprecated + @PublicEvolving + def getName : String = name /** * Sets the name of the current data stream. This name is @@ -209,6 +261,10 @@ class DataStream[T](stream: JavaStream[T]) { this } + // -------------------------------------------------------------------------- + // Stream Transformations + // -------------------------------------------------------------------------- + /** * Creates a new DataStream by merging DataStream outputs of * the same type with each other. The DataStreams merged using this operator
