[ 
https://issues.apache.org/jira/browse/FLINK-21507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17307088#comment-17307088
 ] 

David Anderson commented on FLINK-21507:
----------------------------------------

[~izeigerman] I believe that what you are trying to do here doesn't really work.

While your workaround may initially behave correctly, should the job ever need 
restore its state from a checkpoint or savepoint (i.e., during recovery or 
rescaling), the state won't be correctly partitioned after the restore. 
Instead, during recovery each instance will load the state for the key groups 
assigned to that instance (by Flink's standard partitioner), which won't be 
same partitioning as that implemented by the custom partitioner. So the part of 
the keyspace used by the custom partitioner won't overlap much with the part of 
keyspace loaded during the state restore, and most of the state you need will 
appear to be missing.



> Reinterpreting stream as keyed breaks the upstream partitioning
> ---------------------------------------------------------------
>
>                 Key: FLINK-21507
>                 URL: https://issues.apache.org/jira/browse/FLINK-21507
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, API / Scala
>    Affects Versions: 1.11.0
>            Reporter: Iaroslav Zeigerman
>            Priority: Major
>
> When I use multiple custom partitioning operations in a row like this:
> {code:java}
> stream
>   .partitionCustom(<custom_partitioner1>, _.key1)
>   .mapWithState(...)
>   .partitionCustom(<custom_partitioner2>, _.key2)
>   .map(...)
>   ....{code}
> I see that only last partitioning operation (custom_partitioner2) is 
> reflected in the DAG while the 1st one is ignored entirely.
> I've also confirmed that the 1st partitioning wasn't applied at runtime from 
> application logs.
> *UPD*
> Seems like the problem is caused by DataStreamUtils.reinterpretAsKeyedStream:
> {code:scala}
>     case class TestRecord(key: String)
>     
>     val env = StreamExecutionEnvironment.getExecutionEnvironment 
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
>     env.setParallelism(1)
>     val testRecordSoruce = ...
>     val randomPartitioner = new Partitioner[String] {
>       override def partition(key: String, numPartitions: Int): Int = 
> math.abs(key.hashCode) % numPartitions
>     }
>     val firstPartitioning = env
>       .addSource(testRecordSoruce)
>       .partitionCustom(randomPartitioner, _.key)
>     val keyedStream = new KeyedStream(
>       DataStreamUtils.reinterpretAsKeyedStream(
>         firstPartitioning.javaStream,
>         new KeySelector[TestRecord, String] {
>           override def getKey(value: TestRecord): String = value.key
>         }
>       )
>     )
>     keyedStream
>       .map(identity(_))
>       .partitionCustom(randomPartitioner, _.key)
>       .map(identity(_))
>  
>  {code}
> This code produces the following DAG:
> {code:javascript}
> {
>   "nodes" : [ {
>     "id" : 22,
>     "type" : "Source: Custom Source",
>     "pact" : "Data Source",
>     "contents" : "Source: Custom Source",
>     "parallelism" : 1
>   }, {
>     "id" : 25,
>     "type" : "Map",
>     "pact" : "Operator",
>     "contents" : "Map",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 22,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 27,
>     "type" : "Map",
>     "pact" : "Operator",
>     "contents" : "Map",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 25,
>       "ship_strategy" : "CUSTOM",
>       "side" : "second"
>     } ]
>   } ]
> }
> {code}
> The expected behavior to have CUSTOM connection in both cases vs FORWARD then 
> CUSTOM.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to