[ 
https://issues.apache.org/jira/browse/FLINK-21507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311268#comment-17311268
 ] 

nobleyd commented on FLINK-21507:
---------------------------------

Hi, [~izeigerman]. I don't know what is the original purpose for you code. I 
don't think it make some sense. Why not just use `keyBy(key)` in this case? 
'DataStreamUtils.reinterpretAsKeyedStream' is used to reinterpret a 
'DataStream' which is key-based partitioned originally. For example, you sink a 
keyedStream to a kafka topic using fixedKafkaPartitioner. 

Or you are just experiencing a data skew? Then you  need to split the key to 
key_1 and key_2...... Or adjust the max parallelism since it also influences 
the distribute of 'keyBy'.

> Reinterpreting stream as keyed breaks the upstream partitioning
> ---------------------------------------------------------------
>
>                 Key: FLINK-21507
>                 URL: https://issues.apache.org/jira/browse/FLINK-21507
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, API / Scala
>    Affects Versions: 1.11.0
>            Reporter: Iaroslav Zeigerman
>            Priority: Major
>
> When I use multiple custom partitioning operations in a row like this:
> {code:java}
> stream
>   .partitionCustom(<custom_partitioner1>, _.key1)
>   .mapWithState(...)
>   .partitionCustom(<custom_partitioner2>, _.key2)
>   .map(...)
>   ....{code}
> I see that only last partitioning operation (custom_partitioner2) is 
> reflected in the DAG while the 1st one is ignored entirely.
> I've also confirmed that the 1st partitioning wasn't applied at runtime from 
> application logs.
> *UPD*
> Seems like the problem is caused by DataStreamUtils.reinterpretAsKeyedStream:
> {code:scala}
>     case class TestRecord(key: String)
>     
>     val env = StreamExecutionEnvironment.getExecutionEnvironment 
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
>     env.setParallelism(1)
>     val testRecordSoruce = ...
>     val randomPartitioner = new Partitioner[String] {
>       override def partition(key: String, numPartitions: Int): Int = 
> math.abs(key.hashCode) % numPartitions
>     }
>     val firstPartitioning = env
>       .addSource(testRecordSoruce)
>       .partitionCustom(randomPartitioner, _.key)
>     val keyedStream = new KeyedStream(
>       DataStreamUtils.reinterpretAsKeyedStream(
>         firstPartitioning.javaStream,
>         new KeySelector[TestRecord, String] {
>           override def getKey(value: TestRecord): String = value.key
>         }
>       )
>     )
>     keyedStream
>       .map(identity(_))
>       .partitionCustom(randomPartitioner, _.key)
>       .map(identity(_))
>  
>  {code}
> This code produces the following DAG:
> {code:javascript}
> {
>   "nodes" : [ {
>     "id" : 22,
>     "type" : "Source: Custom Source",
>     "pact" : "Data Source",
>     "contents" : "Source: Custom Source",
>     "parallelism" : 1
>   }, {
>     "id" : 25,
>     "type" : "Map",
>     "pact" : "Operator",
>     "contents" : "Map",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 22,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 27,
>     "type" : "Map",
>     "pact" : "Operator",
>     "contents" : "Map",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 25,
>       "ship_strategy" : "CUSTOM",
>       "side" : "second"
>     } ]
>   } ]
> }
> {code}
> The expected behavior to have CUSTOM connection in both cases vs FORWARD then 
> CUSTOM.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to