[
https://issues.apache.org/jira/browse/FLINK-21507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17291824#comment-17291824
]
Iaroslav Zeigerman edited comment on FLINK-21507 at 2/26/21, 6:11 PM:
----------------------------------------------------------------------
Seems like the problem might reside here -
[https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java#L207]
where we enforce the ForwardPartitioner strategy.
The following workaround worked for me:
{code:scala}
def reinterpretAsKeyedStream[T, K](
stream: DataStream[T],
keySelector: KeySelector[T, K],
partitioner: Partitioner[K]
): KeyedStream[T, K] = {
// A workaround we use to create a keyed stream with a custom partitioning
strategy.
// Unfortunately the existing DataStreamUtils.reinterpretAsKeyedStream
helper method overrides
// the upstream partitioning strategy with Forward partitioning which is
not desirable when
// we want to uniformly reshuffle the input stream.
val javaStream = stream.javaStream
val typeInfo: TypeInformation[K] =
TypeExtractor.getKeySelectorTypes(keySelector, javaStream.getType)
val partitionTransformation: PartitionTransformation[T] =
new PartitionTransformation[T](
javaStream.getTransformation,
new CustomPartitionerWrapper[K, T](partitioner, keySelector)
)
val constructor: Constructor[JavaKeyedStream[T, K]] =
classOf[JavaKeyedStream[T, K]].getDeclaredConstructor(
classOf[JavaDataStream[T]],
classOf[PartitionTransformation[T]],
classOf[KeySelector[T, K]],
classOf[TypeInformation[K]]
)
constructor.setAccessible(true)
new KeyedStream[T, K](constructor.newInstance(javaStream,
partitionTransformation, keySelector, typeInfo))
}
{code}
was (Author: izeigerman):
Seems like the problem might reside here -
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStreamUtils.java#L207
where we enforce the ForwardPartitioner strategy.
The following workaround worked for me:
{code:scala}
def reinterpretAsKeyedStream[T, K](
stream: DataStream[T],
keySelector: KeySelector[T, K],
partitioner: Partitioner[K]
): KeyedStream[T, K] = {
// A workaround we use to create a keyed stream with a custom partitioning
strategy.
// Unfortunately the existing DataStreamUtils.reinterpretAsKeyedStream
helper method overrides
// the upstream partitioning strategy with Forward partitioning which is
not desirable when
// we want to uniformly reshuffle the input stream.
val javaStream = stream.javaStream
val typeInfo: TypeInformation[K] =
TypeExtractor.getKeySelectorTypes(keySelector, javaStream.getType)
val partitionTransformation: PartitionTransformation[T] =
new PartitionTransformation[T](
javaStream.getTransformation,
new CustomPartitionerWrapper[K, T](partitioner, keySelector)
)
val constructor: Constructor[JavaKeyedStream[T, K]] =
classOf[JavaKeyedStream[T, K]].getDeclaredConstructor(
classOf[JavaDataStream[T]],
classOf[PartitionTransformation[T]],
classOf[KeySelector[T, K]],
classOf[TypeInformation[K]]
)
constructor.setAccessible(true)
new KeyedStream[T, K](constructor.newInstance(javaStream,
partitionTransformation, keySelector, typeInfo))
}
{code}
> Reinterpreting stream as keyed breaks the upstream partitioning
> ---------------------------------------------------------------
>
> Key: FLINK-21507
> URL: https://issues.apache.org/jira/browse/FLINK-21507
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream, API / Scala
> Affects Versions: 1.11.0
> Reporter: Iaroslav Zeigerman
> Priority: Major
>
> When I use multiple custom partitioning operations in a row like this:
> {code:java}
> stream
> .partitionCustom(<custom_partitioner1>, _.key1)
> .mapWithState(...)
> .partitionCustom(<custom_partitioner2>, _.key2)
> .map(...)
> ....{code}
> I see that only last partitioning operation (custom_partitioner2) is
> reflected in the DAG while the 1st one is ignored entirely.
> I've also confirmed that the 1st partitioning wasn't applied at runtime from
> application logs.
> *UPD*
> Seems like the problem is caused by DataStreamUtils.reinterpretAsKeyedStream:
> {code:scala}
> case class TestRecord(key: String)
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.setParallelism(1)
> val testRecordSoruce = ...
> val randomPartitioner = new Partitioner[String] {
> override def partition(key: String, numPartitions: Int): Int =
> math.abs(key.hashCode) % numPartitions
> }
> val firstPartitioning = env
> .addSource(testRecordSoruce)
> .partitionCustom(randomPartitioner, _.key)
> val keyedStream = new KeyedStream(
> DataStreamUtils.reinterpretAsKeyedStream(
> firstPartitioning.javaStream,
> new KeySelector[TestRecord, String] {
> override def getKey(value: TestRecord): String = value.key
> }
> )
> )
> keyedStream
> .map(identity(_))
> .partitionCustom(randomPartitioner, _.key)
> .map(identity(_))
>
> {code}
> This code produces the following graph:
> {code:javascript}
> {
> "nodes" : [ {
> "id" : 22,
> "type" : "Source: Custom Source",
> "pact" : "Data Source",
> "contents" : "Source: Custom Source",
> "parallelism" : 1
> }, {
> "id" : 25,
> "type" : "Map",
> "pact" : "Operator",
> "contents" : "Map",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 22,
> "ship_strategy" : "FORWARD",
> "side" : "second"
> } ]
> }, {
> "id" : 27,
> "type" : "Map",
> "pact" : "Operator",
> "contents" : "Map",
> "parallelism" : 1,
> "predecessors" : [ {
> "id" : 25,
> "ship_strategy" : "CUSTOM",
> "side" : "second"
> } ]
> } ]
> }
> {code}
> The expected behavior to have CUSTOM connection in both cases vs FORWARD then
> CUSTOM.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)