[
https://issues.apache.org/jira/browse/FLINK-21507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17291993#comment-17291993
]
Iaroslav Zeigerman edited comment on FLINK-21507 at 2/27/21, 1:12 AM:
----------------------------------------------------------------------
The alternative (simpler) workaround is to insert a dummy map() operator right
before reinterpreting the stream as keyed stream:
{code:scala}
val firstPartitioning = env
.addSource(testRecordSoruce)
.partitionCustom(randomPartitioner, _.key)
.map(identity(_))
.name("dummy-map")
.uid("dummy-map")
val keyedStream = new KeyedStream(
DataStreamUtils.reinterpretAsKeyedStream(
firstPartitioning.javaStream,
new KeySelector[TestRecord, String] {
override def getKey(value: TestRecord): String = value.key
}
)
)
{code}
was (Author: izeigerman):
The alternative (simpler) workaround is to insert a dummy map() operator right
before reinterpreting the stream as keyed stream:
{code:scala}
val firstPartitioning = env
.addSource(testRecordSoruce)
.partitionCustom(randomPartitioner, _.key)
.map(identity(_))
.name("dummy-map")
.uid("dummy-map")
val keyedStream = new KeyedStream(
DataStreamUtils.reinterpretAsKeyedStream(
firstPartitioning.javaStream,
new KeySelector[TestRecord, String] {
override def getKey(value: TestRecord): String = value.key
}
)
)
{code}
> Reinterpreting stream as keyed breaks the upstream partitioning
> ---------------------------------------------------------------
>
> Key: FLINK-21507
> URL: https://issues.apache.org/jira/browse/FLINK-21507
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream, API / Scala
> Affects Versions: 1.11.0
> Reporter: Iaroslav Zeigerman
> Priority: Major
>
> When I use multiple custom partitioning operations in a row like this:
> {code:java}
> stream
> .partitionCustom(<custom_partitioner1>, _.key1)
> .mapWithState(...)
> .partitionCustom(<custom_partitioner2>, _.key2)
> .map(...)
> ....{code}
> I see that only last partitioning operation (custom_partitioner2) is
> reflected in the DAG while the 1st one is ignored entirely.
> I've also confirmed that the 1st partitioning wasn't applied at runtime from
> application logs.
> *UPD*
> Seems like the problem is caused by DataStreamUtils.reinterpretAsKeyedStream:
> {code:scala}
> case class TestRecord(key: String)
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.setParallelism(1)
> val testRecordSoruce = ...
> val randomPartitioner = new Partitioner[String] {
> override def partition(key: String, numPartitions: Int): Int =
> math.abs(key.hashCode) % numPartitions
> }
> val firstPartitioning = env
> .addSource(testRecordSoruce)
> .partitionCustom(randomPartitioner, _.key)
> val keyedStream = new KeyedStream(
> DataStreamUtils.reinterpretAsKeyedStream(
> firstPartitioning.javaStream,
> new KeySelector[TestRecord, String] {
> override def getKey(value: TestRecord): String = value.key
> }
> )
> )
> keyedStream
> .map(identity(_))
> .partitionCustom(randomPartitioner, _.key)
> .map(identity(_))
>
> {code}
> This code produces the following DAG:
> {code:javascript}
> {
> "nodes" : [ {
> "id" : 22,
> "type" : "Source: Custom Source",
> "pact" : "Data Source",
> "contents" : "Source: Custom Source",
> "parallelism" : 1
> }, {
> "id" : 25,
> "type" : "Map",
> "pact" : "Operator",
> "contents" : "Map",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 22,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 27,
> "type" : "Map",
> "pact" : "Operator",
> "contents" : "Map",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 25,
> "ship_strategy" : "CUSTOM",
> "side" : "second"
> } ]
> } ]
> }
> {code}
> The expected behavior to have CUSTOM connection in both cases vs FORWARD then
> CUSTOM.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)