[
https://issues.apache.org/jira/browse/FLINK-21507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311268#comment-17311268
]
nobleyd commented on FLINK-21507:
---------------------------------
Hi, [~izeigerman]. I don't know what is the original purpose for you code. I
don't think it make some sense. Why not just use `keyBy(key)` in this case?
'DataStreamUtils.reinterpretAsKeyedStream' is used to reinterpret a
'DataStream' which is key-based partitioned originally. For example, you sink a
keyedStream to a kafka topic using fixedKafkaPartitioner.
Or you are just experiencing a data skew? Then you need to split the key to
key_1 and key_2...... Or adjust the max parallelism since it also influences
the distribute of 'keyBy'.
> Reinterpreting stream as keyed breaks the upstream partitioning
> ---------------------------------------------------------------
>
> Key: FLINK-21507
> URL: https://issues.apache.org/jira/browse/FLINK-21507
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream, API / Scala
> Affects Versions: 1.11.0
> Reporter: Iaroslav Zeigerman
> Priority: Major
>
> When I use multiple custom partitioning operations in a row like this:
> {code:java}
> stream
> .partitionCustom(<custom_partitioner1>, _.key1)
> .mapWithState(...)
> .partitionCustom(<custom_partitioner2>, _.key2)
> .map(...)
> ....{code}
> I see that only last partitioning operation (custom_partitioner2) is
> reflected in the DAG while the 1st one is ignored entirely.
> I've also confirmed that the 1st partitioning wasn't applied at runtime from
> application logs.
> *UPD*
> Seems like the problem is caused by DataStreamUtils.reinterpretAsKeyedStream:
> {code:scala}
> case class TestRecord(key: String)
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.setParallelism(1)
> val testRecordSoruce = ...
> val randomPartitioner = new Partitioner[String] {
> override def partition(key: String, numPartitions: Int): Int =
> math.abs(key.hashCode) % numPartitions
> }
> val firstPartitioning = env
> .addSource(testRecordSoruce)
> .partitionCustom(randomPartitioner, _.key)
> val keyedStream = new KeyedStream(
> DataStreamUtils.reinterpretAsKeyedStream(
> firstPartitioning.javaStream,
> new KeySelector[TestRecord, String] {
> override def getKey(value: TestRecord): String = value.key
> }
> )
> )
> keyedStream
> .map(identity(_))
> .partitionCustom(randomPartitioner, _.key)
> .map(identity(_))
>
> {code}
> This code produces the following DAG:
> {code:javascript}
> {
> "nodes" : [ {
> "id" : 22,
> "type" : "Source: Custom Source",
> "pact" : "Data Source",
> "contents" : "Source: Custom Source",
> "parallelism" : 1
> }, {
> "id" : 25,
> "type" : "Map",
> "pact" : "Operator",
> "contents" : "Map",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 22,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 27,
> "type" : "Map",
> "pact" : "Operator",
> "contents" : "Map",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 25,
> "ship_strategy" : "CUSTOM",
> "side" : "second"
> } ]
> } ]
> }
> {code}
> The expected behavior to have CUSTOM connection in both cases vs FORWARD then
> CUSTOM.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)