[
https://issues.apache.org/jira/browse/FLINK-21507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17291771#comment-17291771
]
Iaroslav Zeigerman edited comment on FLINK-21507 at 2/26/21, 5:01 PM:
----------------------------------------------------------------------
Hey [~nobleyd]! Thanks for looking into this! I've investigated issue more and
seems like I haven't provided enough details initially. It seems like the issue
occurs when using `partitionCustom` together with
`DataStreamUtils.reinterpretAsKeyedStream`. Here is a complete example:
{code:scala}
case class TestRecord(key: String)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val testRecordSoruce = ...
val randomPartitioner = new Partitioner[String] {
override def partition(key: String, numPartitions: Int): Int =
math.abs(key.hashCode) % numPartitions
}
val firstPartitioning = env
.addSource(testRecordSoruce)
.partitionCustom(randomPartitioner, _.key)
val keyedStream = new KeyedStream(
DataStreamUtils.reinterpretAsKeyedStream(
firstPartitioning.javaStream,
new KeySelector[TestRecord, String] {
override def getKey(value: TestRecord): String = value.key
}
)
)
keyedStream
.map(identity(_))
.partitionCustom(randomPartitioner, _.key)
.map(identity(_))
{code}
This code produces the following graph:
{code:javascript}
{
"nodes" : [ {
"id" : 22,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
}, {
"id" : 25,
"type" : "Map",
"pact" : "Operator",
"contents" : "Map",
"parallelism" : 1,
"predecessors" : [ {
"id" : 22,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 27,
"type" : "Map",
"pact" : "Operator",
"contents" : "Map",
"parallelism" : 1,
"predecessors" : [ {
"id" : 25,
"ship_strategy" : "CUSTOM",
"side" : "second"
} ]
} ]
}
{code}
As you might see the first connection is specified to be FORWARD rather than
CUSTOM.
was (Author: izeigerman):
Hey [~nobleyd]! Thanks for looking into this! I've investigated issue more and
seems like I haven't provided enough details initially. It seems like the issue
occurs when using `partitionCustom` together with
`DataStreamUtils.reinterpretAsKeyedStream`. Here is a complete example:
{code:scala}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val testRecordSoruce = ...
val randomPartitioner = new Partitioner[String] {
override def partition(key: String, numPartitions: Int): Int =
math.abs(key.hashCode) % numPartitions
}
val firstPartitioning = env
.addSource(testRecordSoruce)
.partitionCustom(randomPartitioner, _.key)
val keyedStream = new KeyedStream(
DataStreamUtils.reinterpretAsKeyedStream(
firstPartitioning.javaStream,
new KeySelector[TestRecord, String] {
override def getKey(value: TestRecord): String = value.key
}
)
)
keyedStream
.map(identity(_))
.partitionCustom(randomPartitioner, _.key)
.map(identity(_))
{code}
This code produces the following graph:
{code:javascript}
{
"nodes" : [ {
"id" : 22,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
}, {
"id" : 25,
"type" : "Map",
"pact" : "Operator",
"contents" : "Map",
"parallelism" : 1,
"predecessors" : [ {
"id" : 22,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 27,
"type" : "Map",
"pact" : "Operator",
"contents" : "Map",
"parallelism" : 1,
"predecessors" : [ {
"id" : 25,
"ship_strategy" : "CUSTOM",
"side" : "second"
} ]
} ]
}
{code}
As you might see the first connection is specified to be FORWARD rather than
CUSTOM.
> Given a sequence of multiple custom stream partitioning operations only last
> transformation in sequence is applied while others are dropped
> -------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-21507
> URL: https://issues.apache.org/jira/browse/FLINK-21507
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream, API / Scala
> Affects Versions: 1.11.0
> Reporter: Iaroslav Zeigerman
> Priority: Major
>
> When I use multiple custom partitioning operations in a row like this:
> {code:java}
> stream
> .partitionCustom(<custom_partitioner1>, _.key1)
> .mapWithState(...)
> .partitionCustom(<custom_partitioner2>, _.key2)
> .map(...)
> ....{code}
> I see that only last partitioning operation (custom_partitioner2) is
> reflected in the DAG while the 1st one is ignored entirely.
> I've also confirmed that the 1st partitioning wasn't applied at runtime from
> application logs.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)