QiuMM commented on a change in pull request #6724: Fix issue that tasks failed
because of no sink for identifier
URL: https://github.com/apache/incubator-druid/pull/6724#discussion_r244994407
##########
File path:
server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java
##########
@@ -521,6 +493,37 @@ public void clear() throws InterruptedException
@Override
public Object doCall() throws IOException
{
+ final Map<String, Integer> currentHydrants = new HashMap<>();
+ final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist
= new ArrayList<>();
+ int numPersistedRows = 0;
+ long bytesPersisted = 0L;
+ for (SegmentIdentifier identifier : sinks.keySet()) {
+ final Sink sink = sinks.get(identifier);
+ if (sink == null) {
+ throw new ISE("No sink for identifier: %s", identifier);
+ }
+ final List<FireHydrant> hydrants = Lists.newArrayList(sink);
+ currentHydrants.put(identifier.getIdentifierAsString(),
hydrants.size());
+ numPersistedRows += sink.getNumRowsInMemory();
+ bytesPersisted += sink.getBytesInMemory();
+
+ final int limit = sink.isWritable() ? hydrants.size() - 1 :
hydrants.size();
+
+ for (FireHydrant hydrant : hydrants.subList(0, limit)) {
+ if (!hydrant.hasSwapped()) {
+ log.info("Hydrant[%s] hasn't persisted yet, persisting.
Segment[%s]", hydrant, identifier);
+ indexesToPersist.add(Pair.of(hydrant, identifier));
+ }
+ }
+
+ if (sink.swappable()) {
+ indexesToPersist.add(Pair.of(sink.swap(), identifier));
+ }
+ }
+ // NB: The rows are still in memory until they're done persisting,
but we only count rows in active indexes.
+ rowsCurrentlyInMemory.addAndGet(-numPersistedRows);
+ bytesCurrentlyInMemory.addAndGet(-bytesPersisted);
Review comment:
They need to use `numPersistedRows` and `bytesPersisted`, so I moved them to
here. If I define `numPersistedRows` and `bytesPersisted` outside this inner
class, they need to be final but they can't be since their value would change.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]