[ 
https://issues.apache.org/jira/browse/FLINK-21507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17291824#comment-17291824
 ] 

Iaroslav Zeigerman edited comment on FLINK-21507 at 2/26/21, 6:17 PM:
----------------------------------------------------------------------

Seems like the problem might reside here - 
[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java#L207]
 where we enforce the ForwardPartitioner strategy.
 The following workaround worked for me:
{code:scala}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.common.functions.Partitioner
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.datastream.{DataStream => JavaDataStream, 
KeyedStream => JavaKeyedStream}
import org.apache.flink.streaming.api.transformations.PartitionTransformation
import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper

  def reinterpretAsKeyedStream[T, K](
      stream: DataStream[T],
      keySelector: KeySelector[T, K],
      partitioner: Partitioner[K]
  ): KeyedStream[T, K] = {
    // A workaround we use to create a keyed stream with a custom partitioning 
strategy.
    // Unfortunately the existing DataStreamUtils.reinterpretAsKeyedStream 
helper method overrides
    // the upstream partitioning strategy with Forward partitioning which is 
not desirable when
    // we want to uniformly reshuffle the input stream.

    val javaStream                   = stream.javaStream
    val typeInfo: TypeInformation[K] = 
TypeExtractor.getKeySelectorTypes(keySelector, javaStream.getType)

    val partitionTransformation: PartitionTransformation[T] =
      new PartitionTransformation[T](
        javaStream.getTransformation,
        new CustomPartitionerWrapper[K, T](partitioner, keySelector)
      )

    val constructor: Constructor[JavaKeyedStream[T, K]] =
      classOf[JavaKeyedStream[T, K]].getDeclaredConstructor(
        classOf[JavaDataStream[T]],
        classOf[PartitionTransformation[T]],
        classOf[KeySelector[T, K]],
        classOf[TypeInformation[K]]
      )
    constructor.setAccessible(true)

    new KeyedStream[T, K](constructor.newInstance(javaStream, 
partitionTransformation, keySelector, typeInfo))
  }
{code}


was (Author: izeigerman):
Seems like the problem might reside here - 
[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java#L207]
 where we enforce the ForwardPartitioner strategy.
 The following workaround worked for me:
{code:scala}
  def reinterpretAsKeyedStream[T, K](
      stream: DataStream[T],
      keySelector: KeySelector[T, K],
      partitioner: Partitioner[K]
  ): KeyedStream[T, K] = {
    // A workaround we use to create a keyed stream with a custom partitioning 
strategy.
    // Unfortunately the existing DataStreamUtils.reinterpretAsKeyedStream 
helper method overrides
    // the upstream partitioning strategy with Forward partitioning which is 
not desirable when
    // we want to uniformly reshuffle the input stream.

    val javaStream                   = stream.javaStream
    val typeInfo: TypeInformation[K] = 
TypeExtractor.getKeySelectorTypes(keySelector, javaStream.getType)

    val partitionTransformation: PartitionTransformation[T] =
      new PartitionTransformation[T](
        javaStream.getTransformation,
        new CustomPartitionerWrapper[K, T](partitioner, keySelector)
      )

    val constructor: Constructor[JavaKeyedStream[T, K]] =
      classOf[JavaKeyedStream[T, K]].getDeclaredConstructor(
        classOf[JavaDataStream[T]],
        classOf[PartitionTransformation[T]],
        classOf[KeySelector[T, K]],
        classOf[TypeInformation[K]]
      )
    constructor.setAccessible(true)

    new KeyedStream[T, K](constructor.newInstance(javaStream, 
partitionTransformation, keySelector, typeInfo))
  }
{code}

> Reinterpreting stream as keyed breaks the upstream partitioning
> ---------------------------------------------------------------
>
>                 Key: FLINK-21507
>                 URL: https://issues.apache.org/jira/browse/FLINK-21507
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, API / Scala
>    Affects Versions: 1.11.0
>            Reporter: Iaroslav Zeigerman
>            Priority: Major
>
> When I use multiple custom partitioning operations in a row like this:
> {code:java}
> stream
>   .partitionCustom(<custom_partitioner1>, _.key1)
>   .mapWithState(...)
>   .partitionCustom(<custom_partitioner2>, _.key2)
>   .map(...)
>   ....{code}
> I see that only last partitioning operation (custom_partitioner2) is 
> reflected in the DAG while the 1st one is ignored entirely.
> I've also confirmed that the 1st partitioning wasn't applied at runtime from 
> application logs.
> *UPD*
> Seems like the problem is caused by DataStreamUtils.reinterpretAsKeyedStream:
> {code:scala}
>     case class TestRecord(key: String)
>     
>     val env = StreamExecutionEnvironment.getExecutionEnvironment 
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
>     env.setParallelism(1)
>     val testRecordSoruce = ...
>     val randomPartitioner = new Partitioner[String] {
>       override def partition(key: String, numPartitions: Int): Int = 
> math.abs(key.hashCode) % numPartitions
>     }
>     val firstPartitioning = env
>       .addSource(testRecordSoruce)
>       .partitionCustom(randomPartitioner, _.key)
>     val keyedStream = new KeyedStream(
>       DataStreamUtils.reinterpretAsKeyedStream(
>         firstPartitioning.javaStream,
>         new KeySelector[TestRecord, String] {
>           override def getKey(value: TestRecord): String = value.key
>         }
>       )
>     )
>     keyedStream
>       .map(identity(_))
>       .partitionCustom(randomPartitioner, _.key)
>       .map(identity(_))
>  
>  {code}
> This code produces the following graph:
> {code:javascript}
> {
>   "nodes" : [ {
>     "id" : 22,
>     "type" : "Source: Custom Source",
>     "pact" : "Data Source",
>     "contents" : "Source: Custom Source",
>     "parallelism" : 1
>   }, {
>     "id" : 25,
>     "type" : "Map",
>     "pact" : "Operator",
>     "contents" : "Map",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 22,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 27,
>     "type" : "Map",
>     "pact" : "Operator",
>     "contents" : "Map",
>     "parallelism" : 1,
>     "predecessors" : [ {
>       "id" : 25,
>       "ship_strategy" : "CUSTOM",
>       "side" : "second"
>     } ]
>   } ]
> }
> {code}
> The expected behavior to have CUSTOM connection in both cases vs FORWARD then 
> CUSTOM.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to