pnowojski commented on a change in pull request #13109:
URL: https://github.com/apache/flink/pull/13109#discussion_r475525041
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
##########
@@ -36,31 +38,56 @@
@SuppressWarnings({"unchecked", "rawtypes"})
public CopyingDirectedOutput(
List<OutputSelector<OUT>> outputSelectors,
- List<? extends Tuple2<? extends
Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
- super(outputSelectors, outputs);
+ List<? extends Tuple2<? extends
Output<StreamRecord<OUT>>, StreamEdge>> outputs,
+ Counter numRecordsOutForTask) {
+ super(outputSelectors, outputs, numRecordsOutForTask);
}
@Override
public void collect(StreamRecord<OUT> record) {
- Set<Output<StreamRecord<OUT>>> selectedOutputs =
selectOutputs(record);
+ Tuple2<Set<RecordWriterOutput<OUT>>,
Set<Output<StreamRecord<OUT>>>> selectedOutputs = selectOutputs(record);
Review comment:
There are two things that I'm not sure about here.
1. `Tuple2` I think is not the best option. For one thing
```
!selectedOutputs.f0.isEmpty()
```
or
```protected Tuple2<Set<RecordWriterOutput<OUT>>,
Set<Output<StreamRecord<OUT>>>> selectOutputs(StreamRecord<OUT> record)
```
are not very readable. Also this is one more object allocation (`Tuple2`). I
think it would be better to have two methods:
```
Set<...> selectChainedOutputs(record) {...}
Set<...> selectNonChainedOutputs(record) {...}
```
2. Returning a new `Set<...>` from every call I think is also very expensive
and unnecessary. This is a pre-existing problem, but maybe we could fix it?
Instead of the `selectOutputs` we could have:
```
void collectToChainedOutputs(StreamRecord<...> record,
SelectedOutputsCollector<...> collector) {...}
void selectNonChainedOutputs(StreamRecord<...> record,
SelectedOutputsCollector<...> collector) {...}
```
Where
```
public interface SelectedOutputsCollector {
void collect(Output<StreamRecord<...>> output, StreamRecord<...> record);
}
```
`SelectedOutputsCollector` would have a 4 versions (or maybe two versions +
option to configure the other two? or maybe just a single implementation with 2
boolean flags?) for:
- emitting without bumping `numRecordsOutForTask` without a copy
- emitting without bumping `numRecordsOutForTask` with a copy
- emitting with bumping `numRecordsOutForTask` without a copy
- emitting with bumping `numRecordsOutForTask` with a copy
I think it can make the code cleaner, while also saving on allocations. The
one downside would be that we would loose the optimisation for avoiding shallow
copying for the last output, but that's just one extra allocation of
`StreamRecord`. However that should be out-weighted by savings on `Set`
allocations.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/CopyingDirectedOutput.java
##########
@@ -36,31 +38,56 @@
@SuppressWarnings({"unchecked", "rawtypes"})
public CopyingDirectedOutput(
List<OutputSelector<OUT>> outputSelectors,
- List<? extends Tuple2<? extends
Output<StreamRecord<OUT>>, StreamEdge>> outputs) {
- super(outputSelectors, outputs);
+ List<? extends Tuple2<? extends
Output<StreamRecord<OUT>>, StreamEdge>> outputs,
+ Counter numRecordsOutForTask) {
+ super(outputSelectors, outputs, numRecordsOutForTask);
}
@Override
public void collect(StreamRecord<OUT> record) {
- Set<Output<StreamRecord<OUT>>> selectedOutputs =
selectOutputs(record);
+ Tuple2<Set<RecordWriterOutput<OUT>>,
Set<Output<StreamRecord<OUT>>>> selectedOutputs = selectOutputs(record);
Review comment:
There are two things that I'm not sure about here.
1. `Tuple2` I think is not the best option. For one thing
```
!selectedOutputs.f0.isEmpty()
```
or
```
protected Tuple2<Set<RecordWriterOutput<OUT>>,
Set<Output<StreamRecord<OUT>>>> selectOutputs(StreamRecord<OUT> record)
```
are not very readable. Also this is one more object allocation (`Tuple2`). I
think it would be better to have two methods:
```
Set<...> selectChainedOutputs(record) {...}
Set<...> selectNonChainedOutputs(record) {...}
```
2. Returning a new `Set<...>` from every call I think is also very expensive
and unnecessary. This is a pre-existing problem, but maybe we could fix it?
Instead of the `selectOutputs` we could have:
```
void collectToChainedOutputs(StreamRecord<...> record,
SelectedOutputsCollector<...> collector) {...}
void selectNonChainedOutputs(StreamRecord<...> record,
SelectedOutputsCollector<...> collector) {...}
```
Where
```
public interface SelectedOutputsCollector {
void collect(Output<StreamRecord<...>> output, StreamRecord<...> record);
}
```
`SelectedOutputsCollector` would have a 4 versions (or maybe two versions +
option to configure the other two? or maybe just a single implementation with 2
boolean flags?) for:
- emitting without bumping `numRecordsOutForTask` without a copy
- emitting without bumping `numRecordsOutForTask` with a copy
- emitting with bumping `numRecordsOutForTask` without a copy
- emitting with bumping `numRecordsOutForTask` with a copy
I think it can make the code cleaner, while also saving on allocations. The
one downside would be that we would loose the optimisation for avoiding shallow
copying for the last output, but that's just one extra allocation of
`StreamRecord`. However that should be out-weighted by savings on `Set`
allocations.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]