aljoscha commented on a change in pull request #12306:
URL: https://github.com/apache/flink/pull/12306#discussion_r430206700



##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
##########
@@ -88,13 +85,17 @@ public WatermarkOutputMultiplexer(WatermarkOutput 
underlyingOutput) {
         * an output ID that can be used to get a deferred or immediate {@link 
WatermarkOutput} for that
         * output.
         */
-       public int registerNewOutput() {
-               int newOutputId = nextOutputId;
-               nextOutputId++;
-               OutputState outputState = new OutputState();
-               watermarkPerOutputId.put(newOutputId, outputState);
+       public void registerNewOutput(String id) {
+               final OutputState outputState = new OutputState();
+
+               final OutputState previouslyRegistered = 
watermarkPerOutputId.putIfAbsent(id, outputState);
+               checkState(previouslyRegistered == null, "Already contains an 
output for ID %s", id);
+
                watermarkOutputs.add(outputState);
-               return newOutputId;
+       }
+
+       public boolean unregisterOutput(String id) {
+               return watermarkPerOutputId.remove(id) != null;

Review comment:
       Ah no, I didn't think. It's of course possible because we're removing 
the same instance and `equals` therefore works...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to