[ 
https://issues.apache.org/jira/browse/FLINK-2136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14573592#comment-14573592
 ] 

ASF GitHub Bot commented on FLINK-2136:
---------------------------------------

Github user mbalassi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/771#discussion_r31766241
  
    --- Diff: 
flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
 ---
    @@ -0,0 +1,475 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.scala
    +
    +import java.lang
    +
    +import org.apache.flink.api.common.functions._
    +import org.apache.flink.api.java.typeutils.TypeExtractor
    +import org.apache.flink.streaming.api.collector.selector.OutputSelector
    +import org.apache.flink.streaming.api.functions.co.CoMapFunction
    +import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph, 
StreamNode}
    +import 
org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, 
StreamOperator}
    +import org.apache.flink.streaming.api.windowing.helper.Count
    +import org.apache.flink.streaming.runtime.partitioner._
    +import org.apache.flink.util.Collector
    +import org.junit.Assert.fail
    +import org.junit.Test
    +
    +class DataStreamTest {
    +
    +  private val parallelism = 2
    +
    +  @Test
    +  def testNaming(): Unit = {
    +    val env = 
StreamExecutionEnvironment.createLocalEnvironment(parallelism)
    +
    +    val source1 = env.generateSequence(0, 0).name("testSource1")
    +    assert("testSource1" == source1.getName)
    +
    +    val dataStream1 = source1
    +      .map(x => 0L)
    +      .name("testMap")
    +    assert("testMap" == dataStream1.getName)
    +
    +    val dataStream2 = env.generateSequence(0, 0).name("testSource2")
    +      .reduce((x, y) => 0)
    +      .name("testReduce")
    +    assert("testReduce" == dataStream2.getName)
    +
    +    val connected = dataStream1.connect(dataStream2)
    +      .flatMap(
    +    { (in, out: Collector[Long]) => }, { (in, out: Collector[Long]) => }
    +    ).name("testCoFlatMap")
    +    assert("testCoFlatMap" == connected.getName)
    +
    +    val fu: ((Long, Long) => Long) =
    +      (x: Long, y: Long) => 0L
    +
    +    val windowed = connected.window(Count.of(10))
    +      .foldWindow(0L, fu)
    +
    +    windowed.name("testWindowFold")
    +    assert("testWindowFold" == windowed.getName)
    +
    +    windowed.flatten()
    +
    +    val plan = env.getExecutionPlan
    +
    +    assert(plan contains "testSource1")
    +    assert(plan contains "testSource2")
    +    assert(plan contains "testMap")
    +    assert(plan contains "testReduce")
    +    assert(plan contains "testCoFlatMap")
    +    assert(plan contains "testWindowFold")
    +  }
    +
    +  /**
    +   * Tests that {@link DataStream#groupBy} and {@link 
DataStream#partitionBy(KeySelector)} result in
    +   * different and correct topologies. Does the some for the {@link 
ConnectedDataStream}.
    +   */
    +  @Test
    +  def testPartitioning(): Unit = {
    +    val env = 
StreamExecutionEnvironment.createLocalEnvironment(parallelism);
    +    val graph: StreamGraph = env.getStreamGraph;
    +
    +    val src1: DataStream[(Long, Long)] = env.fromElements((0L, 0L))
    +    val src2: DataStream[(Long, Long)] = env.fromElements((0L, 0L))
    +
    +    val connected = src1.connect(src2)
    +
    +    val group1 = src1.groupBy(0)
    +    val group2 = src1.groupBy(1, 0)
    +    val group3 = src1.groupBy("_1")
    +    val group4 = src1.groupBy(x => x._1)
    +
    +    assert(isPartitioned(graph.getStreamEdge(group1.getId, 
createDownStreamId(group1))));
    +    assert(isPartitioned(graph.getStreamEdge(group2.getId, 
createDownStreamId(group2))));
    --- End diff --
    
    Trivial: semicolon.


> Test the streaming scala API
> ----------------------------
>
>                 Key: FLINK-2136
>                 URL: https://issues.apache.org/jira/browse/FLINK-2136
>             Project: Flink
>          Issue Type: Test
>          Components: Scala API, Streaming
>    Affects Versions: 0.9
>            Reporter: Márton Balassi
>            Assignee: Gábor Hermann
>
> There are no test covering the streaming scala API. I would suggest to test 
> whether the StreamGraph created by a certain operation looks as expected. 
> Deeper layers and runtime should not be tested here, that is done in 
> streaming-core.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to