[
https://issues.apache.org/jira/browse/FLINK-21507?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Iaroslav Zeigerman updated FLINK-21507:
---------------------------------------
Description:
When I use multiple custom partitioning operations in a row like this:
{code:java}
stream
.partitionCustom(<custom_partitioner1>, _.key1)
.mapWithState(...)
.partitionCustom(<custom_partitioner2>, _.key2)
.map(...)
....{code}
I see that only last partitioning operation (custom_partitioner2) is reflected
in the DAG while the 1st one is ignored entirely.
I've also confirmed that the 1st partitioning wasn't applied at runtime from
application logs.
*UPD*
Seems like the problem is caused by DataStreamUtils.reinterpretAsKeyedStream:
{code:scala}
case class TestRecord(key: String)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
val testRecordSoruce = ...
val randomPartitioner = new Partitioner[String] {
override def partition(key: String, numPartitions: Int): Int =
math.abs(key.hashCode) % numPartitions
}
val firstPartitioning = env
.addSource(testRecordSoruce)
.partitionCustom(randomPartitioner, _.key)
val keyedStream = new KeyedStream(
DataStreamUtils.reinterpretAsKeyedStream(
firstPartitioning.javaStream,
new KeySelector[TestRecord, String] {
override def getKey(value: TestRecord): String = value.key
}
)
)
keyedStream
.map(identity(_))
.partitionCustom(randomPartitioner, _.key)
.map(identity(_))
{code}
This code produces the following graph:
{code:javascript}
{
"nodes" : [ {
"id" : 22,
"type" : "Source: Custom Source",
"pact" : "Data Source",
"contents" : "Source: Custom Source",
"parallelism" : 1
}, {
"id" : 25,
"type" : "Map",
"pact" : "Operator",
"contents" : "Map",
"parallelism" : 1,
"predecessors" : [ {
"id" : 22,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 27,
"type" : "Map",
"pact" : "Operator",
"contents" : "Map",
"parallelism" : 1,
"predecessors" : [ {
"id" : 25,
"ship_strategy" : "CUSTOM",
"side" : "second"
} ]
} ]
}
{code}
The expected behavior
was:
When I use multiple custom partitioning operations in a row like this:
{code:java}
stream
.partitionCustom(<custom_partitioner1>, _.key1)
.mapWithState(...)
.partitionCustom(<custom_partitioner2>, _.key2)
.map(...)
....{code}
I see that only last partitioning operation (custom_partitioner2) is reflected
in the DAG while the 1st one is ignored entirely.
I've also confirmed that the 1st partitioning wasn't applied at runtime from
application logs.
**UPD**
Seems like the problem is caused by DataStreamUtils.reinterpretAsKeyedStream:
> Reinterpreting stream as keyed breaks the upstream partitioning
> ---------------------------------------------------------------
>
> Key: FLINK-21507
> URL: https://issues.apache.org/jira/browse/FLINK-21507
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream, API / Scala
> Affects Versions: 1.11.0
> Reporter: Iaroslav Zeigerman
> Priority: Major
>
> When I use multiple custom partitioning operations in a row like this:
> {code:java}
> stream
> .partitionCustom(<custom_partitioner1>, _.key1)
> .mapWithState(...)
> .partitionCustom(<custom_partitioner2>, _.key2)
> .map(...)
> ....{code}
> I see that only last partitioning operation (custom_partitioner2) is
> reflected in the DAG while the 1st one is ignored entirely.
> I've also confirmed that the 1st partitioning wasn't applied at runtime from
> application logs.
> *UPD*
> Seems like the problem is caused by DataStreamUtils.reinterpretAsKeyedStream:
> {code:scala}
> case class TestRecord(key: String)
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.setParallelism(1)
> val testRecordSoruce = ...
> val randomPartitioner = new Partitioner[String] {
> override def partition(key: String, numPartitions: Int): Int =
> math.abs(key.hashCode) % numPartitions
> }
> val firstPartitioning = env
> .addSource(testRecordSoruce)
> .partitionCustom(randomPartitioner, _.key)
> val keyedStream = new KeyedStream(
> DataStreamUtils.reinterpretAsKeyedStream(
> firstPartitioning.javaStream,
> new KeySelector[TestRecord, String] {
> override def getKey(value: TestRecord): String = value.key
> }
> )
> )
> keyedStream
> .map(identity(_))
> .partitionCustom(randomPartitioner, _.key)
> .map(identity(_))
>
> {code}
> This code produces the following graph:
> {code:javascript}
> {
> "nodes" : [ {
> "id" : 22,
> "type" : "Source: Custom Source",
> "pact" : "Data Source",
> "contents" : "Source: Custom Source",
> "parallelism" : 1
> }, {
> "id" : 25,
> "type" : "Map",
> "pact" : "Operator",
> "contents" : "Map",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 22,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 27,
> "type" : "Map",
> "pact" : "Operator",
> "contents" : "Map",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 25,
> "ship_strategy" : "CUSTOM",
> "side" : "second"
> } ]
> } ]
> }
> {code}
> The expected behavior
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)