jon-wei commented on a change in pull request #6724: Fix issue that tasks 
failed because of no sink for identifier
URL: https://github.com/apache/incubator-druid/pull/6724#discussion_r243482186
 
 

 ##########
 File path: 
server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
 ##########
 @@ -485,8 +486,12 @@ public void clear() throws InterruptedException
     final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = new 
ArrayList<>();
     int numPersistedRows = 0;
     long bytesPersisted = 0L;
-    for (SegmentIdentifier identifier : sinks.keySet()) {
-      final Sink sink = sinks.get(identifier);
+    Iterator<Map.Entry<SegmentIdentifier, Sink>> iterator = 
sinks.entrySet().iterator();
+
+    while (iterator.hasNext()) {
 
 Review comment:
   If I understand correctly, from the code in `abandonSegment`, it looks like 
the sink removal should not be happening at the same time as a persist (they're 
both run by the `persistExecutor`).
   
   With this change to using an iterator, I think it would now be possible to 
persist a Sink that was already abandoned (i.e., `abandonSegment` could remove 
a Sink after that Sink is returned by iterator.next() in `persistAll`)).
   
   So I'm wondering if it would be more correct to have this "find sinks to 
persist" code run in one callable together with the actual persist work in the 
`persistExecutor` as well. 
   
   ----
   
   There is also a code block in `push` that has a similar pattern that runs 
outside of the `pushExecutor` (`abandonSegment` uses `pushBarrier()`), I wonder 
if there are similar problems there as well:
   
   ```
   final Map<SegmentIdentifier, Sink> theSinks = new HashMap<>();
       for (final SegmentIdentifier identifier : identifiers) {
         final Sink sink = sinks.get(identifier);
         if (sink == null) {
           throw new ISE("No sink for identifier: %s", identifier);
         }
         theSinks.put(identifier, sink);
         if (sink.finishWriting()) {
           totalRows.addAndGet(-sink.getNumRows());
         }
       }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to