loquisgon commented on a change in pull request #11123:
URL: https://github.com/apache/druid/pull/11123#discussion_r620489264



##########
File path: 
server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
##########
@@ -558,35 +570,44 @@ public void clear() throws InterruptedException
     final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist = 
new ArrayList<>();
     int numPersistedRows = 0;
     long bytesPersisted = 0L;
+    AtomicLong totalHydrantsCount = new AtomicLong();
+    AtomicLong totalHydrantsPersisted = new AtomicLong();
+    final long totalSinks = sinks.size();
     for (Map.Entry<SegmentIdWithShardSpec, Sink> entry : sinks.entrySet()) {
       final SegmentIdWithShardSpec identifier = entry.getKey();
       final Sink sink = entry.getValue();
       if (sink == null) {
         throw new ISE("No sink for identifier: %s", identifier);
       }
       final List<FireHydrant> hydrants = Lists.newArrayList(sink);
+      totalHydrantsCount.addAndGet(hydrants.size());
       currentHydrants.put(identifier.toString(), hydrants.size());
       numPersistedRows += sink.getNumRowsInMemory();
       bytesPersisted += sink.getBytesInMemory();
 
       final int limit = sink.isWritable() ? hydrants.size() - 1 : 
hydrants.size();
 
+      // gather hydrants that have not been persisted:
       for (FireHydrant hydrant : hydrants.subList(0, limit)) {
         if (!hydrant.hasSwapped()) {
           log.debug("Hydrant[%s] hasn't persisted yet, persisting. 
Segment[%s]", hydrant, identifier);
           indexesToPersist.add(Pair.of(hydrant, identifier));
+          totalHydrantsPersisted.addAndGet(1);
         }
       }
 
       if (sink.swappable()) {
+        // It is swappable. Get the old one to persist it and create a new one:
         indexesToPersist.add(Pair.of(sink.swap(), identifier));
+        totalHydrantsPersisted.addAndGet(1);

Review comment:
       Because sink.swap() creates a new hydrant (which should not be counted) 
but returns the *old* hydrant (which needs to be counted)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to