pnowojski commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r475525041
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
##########
@@ -36,31 +38,56 @@
@SuppressWarnings({"unchecked", "rawtypes"})
public CopyingDirectedOutput(
List<OutputSelector<OUT>> outputSelectors,
- List<? extends Tuple2<? extends
Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
- super(outputSelectors, outputs);
+ List<? extends Tuple2<? extends
Output<StreamRecord<OUT>>, StreamEdge>> outputs,
+ Counter numRecordsOutForTask) {
+ super(outputSelectors, outputs, numRecordsOutForTask);
}
@Override
public void collect(StreamRecord<OUT> record) {
- Set<Output<StreamRecord<OUT>>> selectedOutputs =
selectOutputs(record);
+ Tuple2<Set<RecordWriterOutput<OUT>>,
Set<Output<StreamRecord<OUT>>>> selectedOutputs = selectOutputs(record);
Review comment:
There are two things that I'm not sure about here.
1. `Tuple2` I think is not the best option. For one thing
```
!selectedOutputs.f0.isEmpty()
``` or
```protected Tuple2<Set<RecordWriterOutput<OUT>>,
Set<Output<StreamRecord<OUT>>>> selectOutputs(StreamRecord<OUT> record)
```
are not very readable. Also this is one more object allocation (`Tuple2`). I
think it would be better to have two methods:
```
Set<...> selectChainedOutputs(record) {...}
Set<...> selectNonChainedOutputs(record) {...}
```
2. Returning a new `Set<...>` from every call I think is also very expensive
and unnecessary. This is a pre-existing problem, but maybe we could fix it?
Instead of the `selectOutputs` we could have:
```
void collectToChainedOutputs(StreamRecord<...> record,
SelectedOutputsCollector<...> collector) {...}
void selectNonChainedOutputs(StreamRecord<...> record,
SelectedOutputsCollector<...> collector) {...}
```
Where
```
public interface SelectedOutputsCollector {
void collect(Output<StreamRecord<...>> output, StreamRecord<...> record);
}
```
`SelectedOutputsCollector` would have a 4 versions (or maybe two versions +
option to configure the other two? or maybe just a single implementation with 2
boolean flags?) for:
- emitting without bumping `numRecordsOutForTask` without a copy
- emitting without bumping `numRecordsOutForTask` with a copy
- emitting with bumping `numRecordsOutForTask` without a copy
- emitting with bumping `numRecordsOutForTask` with a copy
I think it can make the code cleaner, while also saving on allocations. The
one downside would be that we would loose the optimisation for avoiding shallow
copying for the last output, but that's just one extra allocation of
`StreamRecord`. However that should be out-weighted by savings on `Set`
allocations.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -739,40 +749,59 @@ public void collect(StreamRecord<T> record) {
static class BroadcastingOutputCollector<T> implements
WatermarkGaugeExposingOutput<StreamRecord<T>> {
Review comment:
Since you are modifying and increasing those classes in size, I would
suggest to move them out of the `OperatoChain` (`OperatorChain` is already a
bit too big).
I have already implemented just that as a part of another independent
effort. Could you cherry pick this commit of mine:
https://github.com/pnowojski/flink/commit/368de7dd0fb041e1231220d91434cc3a7a07f147
and base your change on it?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
##########
@@ -409,7 +410,7 @@ public OP getHeadOperator() {
if (selectors == null || selectors.isEmpty()) {
// simple path, no selector necessary
- if (allOutputs.size() == 1) {
+ if (allOutputs.size() == 1 && !(allOutputs.get(0).f0
instanceof RecordWriterOutput)) {
Review comment:
This change will force using `BroadcastingOutputCollector` always, which
I would expect to have a performance impact. Why not to simply use
`CountingOutput` and/or reuse operator level metric in this case?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]