[ 
https://issues.apache.org/jira/browse/FLINK-21507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17291771#comment-17291771
 ] 

Iaroslav Zeigerman edited comment on FLINK-21507 at 2/26/21, 5:01 PM:
----------------------------------------------------------------------

Hey [~nobleyd]! Thanks for looking into this! I've investigated issue more and 
seems like I haven't provided enough details initially. It seems like the issue 
occurs when using `partitionCustom` together with 
`DataStreamUtils.reinterpretAsKeyedStream`. Here is a complete example:
{code:scala}
    case class TestRecord(key: String)
    
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
    env.setParallelism(1)

    val testRecordSoruce = ...

    val randomPartitioner = new Partitioner[String] {
      override def partition(key: String, numPartitions: Int): Int = 
math.abs(key.hashCode) % numPartitions
    }

    val firstPartitioning = env
      .addSource(testRecordSoruce)
      .partitionCustom(randomPartitioner, _.key)

    val keyedStream = new KeyedStream(
      DataStreamUtils.reinterpretAsKeyedStream(
        firstPartitioning.javaStream,
        new KeySelector[TestRecord, String] {
          override def getKey(value: TestRecord): String = value.key
        }
      )
    )

    keyedStream
      .map(identity(_))
      .partitionCustom(randomPartitioner, _.key)
      .map(identity(_))
 
 {code}
This code produces the following graph:
{code:javascript}
{
  "nodes" : [ {
    "id" : 22,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 25,
    "type" : "Map",
    "pact" : "Operator",
    "contents" : "Map",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 22,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 27,
    "type" : "Map",
    "pact" : "Operator",
    "contents" : "Map",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 25,
      "ship_strategy" : "CUSTOM",
      "side" : "second"
    } ]
  } ]
}
{code}
As you might see the first connection is specified to be FORWARD rather than 
CUSTOM.


was (Author: izeigerman):
Hey [~nobleyd]! Thanks for looking into this! I've investigated issue more and 
seems like I haven't provided enough details initially. It seems like the issue 
occurs when using `partitionCustom` together with 
`DataStreamUtils.reinterpretAsKeyedStream`. Here is a complete example:
{code:scala}
    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
    env.setParallelism(1)

    val testRecordSoruce = ...

    val randomPartitioner = new Partitioner[String] {
      override def partition(key: String, numPartitions: Int): Int = 
math.abs(key.hashCode) % numPartitions
    }

    val firstPartitioning = env
      .addSource(testRecordSoruce)
      .partitionCustom(randomPartitioner, _.key)

    val keyedStream = new KeyedStream(
      DataStreamUtils.reinterpretAsKeyedStream(
        firstPartitioning.javaStream,
        new KeySelector[TestRecord, String] {
          override def getKey(value: TestRecord): String = value.key
        }
      )
    )

    keyedStream
      .map(identity(_))
      .partitionCustom(randomPartitioner, _.key)
      .map(identity(_))
 
 {code}
This code produces the following graph:

{code:javascript}
{
  "nodes" : [ {
    "id" : 22,
    "type" : "Source: Custom Source",
    "pact" : "Data Source",
    "contents" : "Source: Custom Source",
    "parallelism" : 1
  }, {
    "id" : 25,
    "type" : "Map",
    "pact" : "Operator",
    "contents" : "Map",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 22,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 27,
    "type" : "Map",
    "pact" : "Operator",
    "contents" : "Map",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 25,
      "ship_strategy" : "CUSTOM",
      "side" : "second"
    } ]
  } ]
}
{code}
As you might see the first connection is specified to be FORWARD rather than 
CUSTOM.


> Given a sequence of multiple custom stream partitioning operations only last 
> transformation in sequence is applied while others are dropped
> -------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-21507
>                 URL: https://issues.apache.org/jira/browse/FLINK-21507
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream, API / Scala
>    Affects Versions: 1.11.0
>            Reporter: Iaroslav Zeigerman
>            Priority: Major
>
> When I use multiple custom partitioning operations in a row like this:
> {code:java}
> stream
>   .partitionCustom(<custom_partitioner1>, _.key1)
>   .mapWithState(...)
>   .partitionCustom(<custom_partitioner2>, _.key2)
>   .map(...)
>   ....{code}
> I see that only last partitioning operation (custom_partitioner2) is 
> reflected in the DAG while the 1st one is ignored entirely.
> I've also confirmed that the 1st partitioning wasn't applied at runtime from 
> application logs.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to