ArafatKhan2198 commented on code in PR #9473: URL: https://github.com/apache/ozone/pull/9473#discussion_r2667419346
########## hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/tasks/NSSummaryAsyncFlusher.java: ########## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.recon.tasks; + +import java.io.Closeable; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hdds.utils.db.RDBBatchOperation; +import org.apache.hadoop.ozone.recon.api.types.NSSummary; +import org.apache.hadoop.ozone.recon.spi.ReconNamespaceSummaryManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Async flusher for NSSummary maps with background thread. + * Workers submit their maps to a queue, background thread processes them. + */ +public final class NSSummaryAsyncFlusher implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(NSSummaryAsyncFlusher.class); + + private final BlockingQueue<Map<Long, NSSummary>> flushQueue; + private final Thread backgroundFlusher; + private final AtomicReference<FlushState> state = + new AtomicReference<>(FlushState.RUNNING); + private final ReconNamespaceSummaryManager reconNamespaceSummaryManager; + private final String taskName; + + private enum FlushState { + RUNNING, + STOPPING, + STOPPED + } + + private NSSummaryAsyncFlusher(ReconNamespaceSummaryManager reconNamespaceSummaryManager, + String taskName, + int queueCapacity) { + this.reconNamespaceSummaryManager = reconNamespaceSummaryManager; + this.taskName = taskName; + this.flushQueue = new LinkedBlockingQueue<>(queueCapacity); + + this.backgroundFlusher = new Thread(this::flushLoop, taskName + "-AsyncFlusher"); + this.backgroundFlusher.setDaemon(true); + } + + /** + * Factory method to create and start an async flusher. + */ + public static NSSummaryAsyncFlusher create( + ReconNamespaceSummaryManager reconNamespaceSummaryManager, + String taskName, + int queueCapacity) { + NSSummaryAsyncFlusher flusher = new NSSummaryAsyncFlusher( + reconNamespaceSummaryManager, taskName, queueCapacity); + flusher.backgroundFlusher.start(); + LOG.info("{}: Started async flusher with queue capacity {}", taskName, queueCapacity); + return flusher; + } + + /** + * Submit a worker map for async flushing. + * Blocks if queue is full (natural backpressure). + */ + public void submitForFlush(Map<Long, NSSummary> workerMap) throws InterruptedException { + flushQueue.put(workerMap); + LOG.debug("{}: Submitted map with {} entries, queue size now {}", taskName, workerMap.size(), flushQueue.size()); + } + + /** + * Background thread loop that processes flush queue. + */ + private void flushLoop() { + while (state.get() == FlushState.RUNNING || !flushQueue.isEmpty()) { + try { + // Attempt to retrieve one batch from the queue + Map<Long, NSSummary> workerMap = flushQueue.poll(100, TimeUnit.MILLISECONDS); + + if (workerMap == null) { + continue; + } + + // Process this batch + LOG.debug("{}: Background thread processing batch with {} entries", taskName, workerMap.size()); + flushWithPropagation(workerMap); + LOG.debug("{}: Background thread finished batch", taskName); + + } catch (InterruptedException e) { + // If we're stopping, ignore interrupts and keep draining the queue. + // Otherwise, preserve interrupt and exit. + if (state.get() == FlushState.STOPPING) { + LOG.debug("{}: Flusher thread interrupted while stopping; continuing to drain queue", + taskName); + Thread.interrupted(); // clear interrupt flag + continue; + } + LOG.info("{}: Flusher thread interrupted", taskName); + Thread.currentThread().interrupt(); + break; + } catch (Exception e) { + LOG.error("{}: Error in flush loop", taskName, e); + // Continue processing other batches Review Comment: Hi @sumitagrawl, The async flusher now tracks a FAILED state on any DB write error or any other error it records the exception and stops processing. Worker threads check flusher health before processing each record and stop within milliseconds if a failure is detected. The queue also rejects new batches immediately after failure, and `close()` propagates the original DB exception so the main task fails cleanly. Result: No wasted work, fast failure detection, protected queue, and clear errors with the original DB issue kept. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
