aljoscha commented on a change in pull request #12306:
URL: https://github.com/apache/flink/pull/12306#discussion_r429840970



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
##########
@@ -88,13 +85,17 @@ public WatermarkOutputMultiplexer(WatermarkOutput 
underlyingOutput) {
         * an output ID that can be used to get a deferred or immediate {@link 
WatermarkOutput} for that
         * output.
         */
-       public int registerNewOutput() {
-               int newOutputId = nextOutputId;
-               nextOutputId++;
-               OutputState outputState = new OutputState();
-               watermarkPerOutputId.put(newOutputId, outputState);
+       public void registerNewOutput(String id) {
+               final OutputState outputState = new OutputState();
+
+               final OutputState previouslyRegistered = 
watermarkPerOutputId.putIfAbsent(id, outputState);
+               checkState(previouslyRegistered == null, "Already contains an 
output for ID %s", id);
+
                watermarkOutputs.add(outputState);
-               return newOutputId;
+       }
+
+       public boolean unregisterOutput(String id) {
+               return watermarkPerOutputId.remove(id) != null;

Review comment:
       This does not remove the output from `watermarkOutputs`. Please add a 
test that verifies correct behaviour when removing outputs.
   
   With the current design, it's actually not possible to remove the output 
from `watermarkOutputs`. One possible solution is to get rid of that list and 
always use the Map for iterating the outputs in `onPeriodicEmit()`. That would 
be a smidge slower but I think that's ok because periodic watermark emission 
does not happen super often.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to