StephanEwen commented on a change in pull request #12306:
URL: https://github.com/apache/flink/pull/12306#discussion_r429846873
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
##########
@@ -88,13 +85,17 @@ public WatermarkOutputMultiplexer(WatermarkOutput
underlyingOutput) {
* an output ID that can be used to get a deferred or immediate {@link
WatermarkOutput} for that
* output.
*/
- public int registerNewOutput() {
- int newOutputId = nextOutputId;
- nextOutputId++;
- OutputState outputState = new OutputState();
- watermarkPerOutputId.put(newOutputId, outputState);
+ public void registerNewOutput(String id) {
+ final OutputState outputState = new OutputState();
+
+ final OutputState previouslyRegistered =
watermarkPerOutputId.putIfAbsent(id, outputState);
+ checkState(previouslyRegistered == null, "Already contains an
output for ID %s", id);
+
watermarkOutputs.add(outputState);
- return newOutputId;
+ }
+
+ public boolean unregisterOutput(String id) {
+ return watermarkPerOutputId.remove(id) != null;
Review comment:
Why is it not possible to remove from `watermarkOutputs`? It is a linear
operation (`List.remove()`), but then again, it doesn't happen very often.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]