aljoscha commented on a change in pull request #12306:
URL: https://github.com/apache/flink/pull/12306#discussion_r430206700
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkOutputMultiplexer.java
##########
@@ -88,13 +85,17 @@ public WatermarkOutputMultiplexer(WatermarkOutput
underlyingOutput) {
* an output ID that can be used to get a deferred or immediate {@link
WatermarkOutput} for that
* output.
*/
- public int registerNewOutput() {
- int newOutputId = nextOutputId;
- nextOutputId++;
- OutputState outputState = new OutputState();
- watermarkPerOutputId.put(newOutputId, outputState);
+ public void registerNewOutput(String id) {
+ final OutputState outputState = new OutputState();
+
+ final OutputState previouslyRegistered =
watermarkPerOutputId.putIfAbsent(id, outputState);
+ checkState(previouslyRegistered == null, "Already contains an
output for ID %s", id);
+
watermarkOutputs.add(outputState);
- return newOutputId;
+ }
+
+ public boolean unregisterOutput(String id) {
+ return watermarkPerOutputId.remove(id) != null;
Review comment:
Ah no, I didn't think. It's of course possible because we're removing
the same instance and `equals` therefore works...
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]