[
https://issues.apache.org/jira/browse/FLINK-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14573592#comment-14573592
]
ASF GitHub Bot commented on FLINK-2136:
---------------------------------------
Github user mbalassi commented on a diff in the pull request:
https://github.com/apache/flink/pull/771#discussion_r31766241
--- Diff:
flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
---
@@ -0,0 +1,475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import java.lang
+
+import org.apache.flink.api.common.functions._
+import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.streaming.api.collector.selector.OutputSelector
+import org.apache.flink.streaming.api.functions.co.CoMapFunction
+import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph,
StreamNode}
+import
org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator,
StreamOperator}
+import org.apache.flink.streaming.api.windowing.helper.Count
+import org.apache.flink.streaming.runtime.partitioner._
+import org.apache.flink.util.Collector
+import org.junit.Assert.fail
+import org.junit.Test
+
+class DataStreamTest {
+
+ private val parallelism = 2
+
+ @Test
+ def testNaming(): Unit = {
+ val env =
StreamExecutionEnvironment.createLocalEnvironment(parallelism)
+
+ val source1 = env.generateSequence(0, 0).name("testSource1")
+ assert("testSource1" == source1.getName)
+
+ val dataStream1 = source1
+ .map(x => 0L)
+ .name("testMap")
+ assert("testMap" == dataStream1.getName)
+
+ val dataStream2 = env.generateSequence(0, 0).name("testSource2")
+ .reduce((x, y) => 0)
+ .name("testReduce")
+ assert("testReduce" == dataStream2.getName)
+
+ val connected = dataStream1.connect(dataStream2)
+ .flatMap(
+ { (in, out: Collector[Long]) => }, { (in, out: Collector[Long]) => }
+ ).name("testCoFlatMap")
+ assert("testCoFlatMap" == connected.getName)
+
+ val fu: ((Long, Long) => Long) =
+ (x: Long, y: Long) => 0L
+
+ val windowed = connected.window(Count.of(10))
+ .foldWindow(0L, fu)
+
+ windowed.name("testWindowFold")
+ assert("testWindowFold" == windowed.getName)
+
+ windowed.flatten()
+
+ val plan = env.getExecutionPlan
+
+ assert(plan contains "testSource1")
+ assert(plan contains "testSource2")
+ assert(plan contains "testMap")
+ assert(plan contains "testReduce")
+ assert(plan contains "testCoFlatMap")
+ assert(plan contains "testWindowFold")
+ }
+
+ /**
+ * Tests that {@link DataStream#groupBy} and {@link
DataStream#partitionBy(KeySelector)} result in
+ * different and correct topologies. Does the some for the {@link
ConnectedDataStream}.
+ */
+ @Test
+ def testPartitioning(): Unit = {
+ val env =
StreamExecutionEnvironment.createLocalEnvironment(parallelism);
+ val graph: StreamGraph = env.getStreamGraph;
+
+ val src1: DataStream[(Long, Long)] = env.fromElements((0L, 0L))
+ val src2: DataStream[(Long, Long)] = env.fromElements((0L, 0L))
+
+ val connected = src1.connect(src2)
+
+ val group1 = src1.groupBy(0)
+ val group2 = src1.groupBy(1, 0)
+ val group3 = src1.groupBy("_1")
+ val group4 = src1.groupBy(x => x._1)
+
+ assert(isPartitioned(graph.getStreamEdge(group1.getId,
createDownStreamId(group1))));
+ assert(isPartitioned(graph.getStreamEdge(group2.getId,
createDownStreamId(group2))));
--- End diff --
Trivial: semicolon.
> Test the streaming scala API
> ----------------------------
>
> Key: FLINK-2136
> URL: https://issues.apache.org/jira/browse/FLINK-2136
> Project: Flink
> Issue Type: Test
> Components: Scala API, Streaming
> Affects Versions: 0.9
> Reporter: Márton Balassi
> Assignee: Gábor Hermann
>
> There are no test covering the streaming scala API. I would suggest to test
> whether the StreamGraph created by a certain operation looks as expected.
> Deeper layers and runtime should not be tested here, that is done in
> streaming-core.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)