loquisgon commented on a change in pull request #11294:
URL: https://github.com/apache/druid/pull/11294#discussion_r661901615
##########
File path:
server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java
##########
@@ -604,48 +557,39 @@ private SegmentsAndCommitMetadata persistAllAndClear()
final Stopwatch runExecStopwatch = Stopwatch.createStarted();
final Stopwatch persistStopwatch = Stopwatch.createStarted();
AtomicLong totalPersistedRows = new AtomicLong(numPersistedRows);
- final ListenableFuture<Object> future = persistExecutor.submit(
- new Callable<Object>()
- {
- @Override
- public Object call()
- {
- try {
- for (Pair<FireHydrant, SegmentIdWithShardSpec> pair :
indexesToPersist) {
- metrics.incrementRowOutputCount(persistHydrant(pair.lhs,
pair.rhs));
- }
-
- log.info(
- "Persisted in-memory data for segments: %s",
- indexesToPersist.stream()
- .map(itp -> itp.rhs.asSegmentId().toString())
- .distinct()
- .collect(Collectors.joining(", "))
- );
- log.info(
- "Persisted stats: processed rows: [%d], persisted rows[%d],
sinks: [%d], total fireHydrants (across sinks): [%d], persisted fireHydrants
(across sinks): [%d]",
- rowIngestionMeters.getProcessed(),
- totalPersistedRows.get(),
- totalSinks,
- totalHydrantsCount.longValue(),
- totalHydrantsPersistedAcrossSinks.longValue()
- );
-
- // return null if committer is null
- return null;
- }
- catch (Exception e) {
- metrics.incrementFailedPersists();
- throw e;
- }
- finally {
- metrics.incrementNumPersists();
-
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
- persistStopwatch.stop();
- }
- }
- }
- );
+
+ try {
+ for (Pair<FireHydrant, SegmentIdWithShardSpec> pair : indexesToPersist) {
+ metrics.incrementRowOutputCount(persistHydrant(pair.lhs, pair.rhs));
+ }
+
+ log.info(
+ "Persisted in-memory data for segments: %s",
+ indexesToPersist.stream()
+ .filter(itp -> itp.rhs != null)
+ .map(itp -> itp.rhs.asSegmentId().toString())
+ .distinct()
+ .collect(Collectors.joining(", "))
+ );
+ log.info(
+ "Persisted stats: processed rows: [%d], persisted rows[%d], sinks:
[%d], total fireHydrants (across sinks): [%d], persisted fireHydrants (across
sinks): [%d]",
+ rowIngestionMeters.getProcessed(),
+ totalPersistedRows.get(),
+ totalSinks,
+ totalHydrantsCount.longValue(),
+ totalHydrantsPersistedAcrossSinks.longValue()
+ );
+
+ }
+ catch (Exception e) {
+ metrics.incrementFailedPersists();
+ throw e;
+ }
+ finally {
+ metrics.incrementNumPersists();
+
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
+ persistStopwatch.stop();
+ }
final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
metrics.incrementPersistBackPressureMillis(startDelay);
Review comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]