This is an automated email from the ASF dual-hosted git repository.
ArafatKhan2198 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 8adb40535e0 HDDS-14913. Implement Scalable CSV Export for Unhealthy
Containers in Recon UI. (#10162).
8adb40535e0 is described below
commit 8adb40535e0eccb3bd88b8b458305db8fc9d812c
Author: Arafat2198 <[email protected]>
AuthorDate: Fri May 8 13:58:52 2026 +0530
HDDS-14913. Implement Scalable CSV Export for Unhealthy Containers in Recon
UI. (#10162).
---
.../common/src/main/resources/ozone-default.xml | 41 ++
.../ozone/recon/TestReconContainerEndpoint.java | 2 +-
.../hadoop/ozone/recon/ReconControllerModule.java | 2 +
.../hadoop/ozone/recon/ReconServerConfigKeys.java | 39 ++
.../hadoop/ozone/recon/api/ContainerEndpoint.java | 166 ++++++-
.../hadoop/ozone/recon/api/ExportJobManager.java | 404 +++++++++++++++++
.../hadoop/ozone/recon/api/types/ExportJob.java | 222 ++++++++++
.../persistence/ContainerHealthSchemaManager.java | 95 +++-
.../src/v2/pages/containers/containers.tsx | 480 ++++++++++++++++++---
.../src/v2/types/container.types.ts | 19 +
.../ozone/recon/api/TestExportJobManager.java | 432 +++++++++++++++++++
.../ozone/recon/api/types/TestExportJob.java | 110 +++++
.../recon/fsck/TestReconReplicationManager.java | 2 +-
.../TestUnhealthyContainersDerbyPerformance.java | 3 +-
14 files changed, 1954 insertions(+), 63 deletions(-)
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 8cbafc61f09..f3daecad495 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3424,6 +3424,47 @@
then the default value of 700 will be used.
</description>
</property>
+ <property>
+ <name>ozone.recon.export.directory</name>
+ <value></value>
+ <tag>OZONE, RECON</tag>
+ <description>
+ Directory where Recon stores exported TAR files containing unhealthy
container
+ CSVs. When empty (default), the path is resolved at runtime as
+ {ozone.recon.db.dir}/exports so exports are co-located with Recon
metadata.
+ </description>
+ </property>
+ <property>
+ <name>ozone.recon.export.max.downloads</name>
+ <value>3</value>
+ <tag>OZONE, RECON</tag>
+ <description>
+ Maximum number of times a completed export TAR file can be downloaded.
+ Once the limit is reached the download endpoint returns HTTP 429.
+ Prevents repeated downloads from misusing network bandwidth.
+ </description>
+ </property>
+ <property>
+ <name>ozone.recon.export.max.jobs.total</name>
+ <value>4</value>
+ <tag>OZONE, RECON</tag>
+ <description>
+ Maximum number of export jobs (waiting + executing) that can exist at
once.
+ Submissions beyond this limit are rejected with HTTP 429. Kept small
because
+ export is single-threaded and the number of distinct unhealthy-container
+ states is bounded.
+ </description>
+ </property>
+ <property>
+ <name>ozone.recon.unhealthy.container.fetch.size</name>
+ <value>10000</value>
+ <tag>OZONE, RECON</tag>
+ <description>
+ Number of rows Derby returns per JDBC round-trip when streaming unhealthy
+ container records during a CSV export. Higher values reduce round-trip
+ overhead at the cost of slightly more memory per fetch batch.
+ </description>
+ </property>
<property>
<name>ozone.scm.network.topology.schema.file</name>
<value>network-topology-default.xml</value>
diff --git
a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java
b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java
index 3ff0a636d81..a9fcbd2689f 100644
---
a/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java
+++
b/hadoop-ozone/integration-test-recon/src/test/java/org/apache/hadoop/ozone/recon/TestReconContainerEndpoint.java
@@ -233,7 +233,7 @@ private Response getContainerEndpointResponse(long
containerId) {
null, // ContainerHealthSchemaManager - not needed for this test
recon.getReconServer().getReconNamespaceSummaryManager(),
recon.getReconServer().getReconContainerMetadataManager(),
- omMetadataManagerInstance);
+ omMetadataManagerInstance, null);
return containerEndpoint.getKeysForContainer(containerId, 10, "");
}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
index 9a9dfb48e74..2827ff6cc86 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconControllerModule.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.ozone.om.protocolPB.OmTransport;
import org.apache.hadoop.ozone.om.protocolPB.OmTransportFactory;
import
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.recon.api.ExportJobManager;
import org.apache.hadoop.ozone.recon.heatmap.HeatMapServiceImpl;
import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
import org.apache.hadoop.ozone.recon.persistence.DataSourceConfiguration;
@@ -110,6 +111,7 @@ protected void configure() {
bind(OMMetadataManager.class).to(ReconOmMetadataManagerImpl.class);
bind(ContainerHealthSchemaManager.class).in(Singleton.class);
+ bind(ExportJobManager.class).in(Singleton.class);
bind(ReconContainerMetadataManager.class)
.to(ReconContainerMetadataManagerImpl.class).in(Singleton.class);
bind(ReconFileMetadataManager.class)
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
index b4da42d8f03..47bdac86d94 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/ReconServerConfigKeys.java
@@ -253,6 +253,45 @@ public final class ReconServerConfigKeys {
"ozone.recon.scm.container.id.batch.size";
public static final long OZONE_RECON_SCM_CONTAINER_ID_BATCH_SIZE_DEFAULT =
1_000_000;
+ /**
+ * JDBC fetch size for CSV exports.
+ * Default: 10,000 rows per fetch
+ */
+ public static final String OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE =
+ "ozone.recon.unhealthy.container.fetch.size";
+ public static final int OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE_DEFAULT =
10_000;
+
+ /**
+ * Max export jobs that can sit in the queue (waiting + executing) at once.
+ * Submissions beyond this limit are rejected with HTTP 429.
+ * Kept small because export is single-threaded and the unhealthy-container
+ * states it can be invoked for are bounded (~5).
+ * Default: 4
+ */
+ public static final String OZONE_RECON_EXPORT_MAX_JOBS_TOTAL =
+ "ozone.recon.export.max.jobs.total";
+ public static final int OZONE_RECON_EXPORT_MAX_JOBS_TOTAL_DEFAULT = 4;
+
+ /**
+ * Directory to store export CSV files.
+ * Default: /tmp/recon/exports
+ */
+ public static final String OZONE_RECON_EXPORT_DIRECTORY =
+ "ozone.recon.export.directory";
+
+ // Default is resolved at runtime as {ozone.recon.db.dir}/exports.
+ // Empty string signals ExportJobManager to compute the path dynamically.
+ public static final String OZONE_RECON_EXPORT_DIRECTORY_DEFAULT = "";
+
+ /**
+ * Maximum number of times a completed export TAR file can be downloaded.
+ * Prevents repeated downloads from filling up network bandwidth or being
misused.
+ * Default: 3
+ */
+ public static final String OZONE_RECON_EXPORT_MAX_DOWNLOADS =
+ "ozone.recon.export.max.downloads";
+ public static final int OZONE_RECON_EXPORT_MAX_DOWNLOADS_DEFAULT = 3;
+
/**
* Private constructor for utility class.
*/
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
index 4cf6ca85f6f..3c97c0a791f 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ContainerEndpoint.java
@@ -26,8 +26,12 @@
import static
org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_MIN_CONTAINER_ID;
import static org.apache.hadoop.ozone.recon.ReconConstants.RECON_QUERY_PREVKEY;
+import java.io.BufferedOutputStream;
+import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.io.UncheckedIOException;
+import java.nio.file.Files;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
@@ -39,8 +43,10 @@
import java.util.UUID;
import java.util.stream.Collectors;
import javax.inject.Inject;
+import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
+import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
@@ -48,6 +54,7 @@
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -67,6 +74,7 @@
import org.apache.hadoop.ozone.recon.api.types.ContainerMetadata;
import org.apache.hadoop.ozone.recon.api.types.ContainersResponse;
import org.apache.hadoop.ozone.recon.api.types.DeletedContainerInfo;
+import org.apache.hadoop.ozone.recon.api.types.ExportJob;
import org.apache.hadoop.ozone.recon.api.types.KeyMetadata;
import
org.apache.hadoop.ozone.recon.api.types.KeyMetadata.ContainerBlockMetadata;
import org.apache.hadoop.ozone.recon.api.types.KeysResponse;
@@ -104,6 +112,7 @@ public class ContainerEndpoint {
private final ContainerHealthSchemaManager containerHealthSchemaManager;
private final ReconNamespaceSummaryManager reconNamespaceSummaryManager;
private final OzoneStorageContainerManager reconSCM;
+ private final ExportJobManager exportJobManager;
private static final Logger LOG =
LoggerFactory.getLogger(ContainerEndpoint.class);
private BucketLayout layout = BucketLayout.DEFAULT;
@@ -145,7 +154,8 @@ public ContainerEndpoint(OzoneStorageContainerManager
reconSCM,
ContainerHealthSchemaManager
containerHealthSchemaManager,
ReconNamespaceSummaryManager
reconNamespaceSummaryManager,
ReconContainerMetadataManager
reconContainerMetadataManager,
- ReconOMMetadataManager omMetadataManager) {
+ ReconOMMetadataManager omMetadataManager,
+ ExportJobManager exportJobManager) {
this.containerManager =
(ReconContainerManager) reconSCM.getContainerManager();
this.pipelineManager = reconSCM.getPipelineManager();
@@ -154,6 +164,7 @@ public ContainerEndpoint(OzoneStorageContainerManager
reconSCM,
this.reconSCM = reconSCM;
this.reconContainerMetadataManager = reconContainerMetadataManager;
this.omMetadataManager = omMetadataManager;
+ this.exportJobManager = exportJobManager;
}
/**
@@ -502,6 +513,159 @@ public Response getUnhealthyContainers(
minContainerId);
}
+ /**
+ * List all export jobs tracked by the server (any status).
+ *
+ * @return Response containing a list of ExportJob objects
+ */
+ @GET
+ @Path("/unhealthy/export")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response listExportJobs() {
+ List<ExportJob> jobs = exportJobManager.getAllJobs();
+ for (ExportJob job : jobs) {
+ if (job.getStatus() == ExportJob.JobStatus.QUEUED) {
+
job.setQueuePosition(exportJobManager.getQueuePosition(job.getJobId()));
+ }
+ }
+ return Response.ok(jobs).build();
+ }
+
+ /**
+ * Start an async CSV export job for unhealthy containers.
+ * Returns immediately with a job ID that the client can poll.
+ *
+ * @param state The container state (required: MISSING, UNDER_REPLICATED,
etc.)
+ * @return Response containing ExportJob with jobId
+ */
+ @POST
+ @Path("/unhealthy/export")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response startExport(@QueryParam("state") String state) {
+
+ if (StringUtils.isEmpty(state)) {
+ throw new WebApplicationException("state query parameter is required",
+ Response.Status.BAD_REQUEST);
+ }
+
+ // Validate state parameter
+ try {
+ ContainerSchemaDefinition.UnHealthyContainerStates.valueOf(state);
+ } catch (IllegalArgumentException e) {
+ throw new WebApplicationException("Invalid state: " + state,
Response.Status.BAD_REQUEST);
+ }
+
+ try {
+ String jobId = exportJobManager.submitJob(state);
+ ExportJob job = exportJobManager.getJob(jobId);
+ return Response.ok(job).build();
+ } catch (IllegalStateException e) {
+ // Return JSON error response instead of HTML
+ Map<String, String> errorResponse = new HashMap<>();
+ errorResponse.put("error", "Too Many Requests");
+ errorResponse.put("message", e.getMessage());
+ return Response.status(Response.Status.TOO_MANY_REQUESTS)
+ .entity(errorResponse)
+ .type(MediaType.APPLICATION_JSON)
+ .build();
+ }
+ }
+
+ /**
+ * Get the status of an export job.
+ *
+ * @param jobId The job ID returned by startExport
+ * @return Response containing the ExportJob with current status/progress
+ */
+ @GET
+ @Path("/unhealthy/export/{jobId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getExportStatus(@PathParam("jobId") String jobId) {
+ ExportJob job = exportJobManager.getJob(jobId);
+ if (job == null) {
+ throw new WebApplicationException("Job not found",
Response.Status.NOT_FOUND);
+ }
+
+ // Calculate and set queue position if QUEUED
+ if (job.getStatus() == ExportJob.JobStatus.QUEUED) {
+ int position = exportJobManager.getQueuePosition(jobId);
+ job.setQueuePosition(position);
+ }
+
+ return Response.ok(job).build();
+ }
+
+ /**
+ * Download a completed export TAR file.
+ *
+ * @param jobId The job ID
+ * @return Response with TAR file stream
+ */
+ @GET
+ @Path("/unhealthy/export/{jobId}/download")
+ @Produces("application/x-tar")
+ public Response downloadExport(@PathParam("jobId") String jobId) {
+ ExportJob job = exportJobManager.getJob(jobId);
+ if (job == null) {
+ throw new WebApplicationException("Job not found",
Response.Status.NOT_FOUND);
+ }
+ if (job.getStatus() != ExportJob.JobStatus.COMPLETED) {
+ throw new WebApplicationException("Job not completed yet",
Response.Status.CONFLICT);
+ }
+
+ File file = new File(job.getFilePath());
+ if (!file.exists()) {
+ throw new WebApplicationException("Export file not found",
Response.Status.NOT_FOUND);
+ }
+
+ if (!job.tryReserveDownload()) {
+ Map<String, String> errorResponse = new java.util.HashMap<>();
+ errorResponse.put("error", "Download limit reached");
+ errorResponse.put("message", "This export has reached its maximum
download limit of "
+ + job.getMaxDownloads() + ".");
+ return Response.status(Response.Status.TOO_MANY_REQUESTS)
+ .entity(errorResponse)
+ .type(MediaType.APPLICATION_JSON)
+ .build();
+ }
+
+ LOG.info("Download {} of {} for job {}", job.getDownloadCount(),
job.getMaxDownloads(), jobId);
+
+ StreamingOutput stream = outputStream -> {
+ try (InputStream fis = Files.newInputStream(file.toPath());
+ BufferedOutputStream bos = new BufferedOutputStream(outputStream,
256 * 1024)) {
+ byte[] buffer = new byte[8192];
+ int bytesRead;
+ while ((bytesRead = fis.read(buffer)) != -1) {
+ bos.write(buffer, 0, bytesRead);
+ }
+ bos.flush();
+ }
+ };
+
+ return Response.ok(stream)
+ .header("Content-Disposition", "attachment; filename=\"" +
job.getFileName() + "\"")
+ .header("Content-Type", "application/x-tar")
+ .build();
+ }
+
+ /**
+ * Cancel a running export job.
+ *
+ * @param jobId The job ID
+ * @return Response with 200 if successful
+ */
+ @DELETE
+ @Path("/unhealthy/export/{jobId}")
+ public Response cancelExport(@PathParam("jobId") String jobId) {
+ try {
+ exportJobManager.cancelJob(jobId);
+ return Response.ok().build();
+ } catch (IllegalStateException e) {
+ throw new WebApplicationException(e.getMessage(),
Response.Status.NOT_FOUND);
+ }
+ }
+
/**
* This API will return all DELETED containers in SCM in below JSON format.
* {
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ExportJobManager.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ExportJobManager.java
new file mode 100644
index 00000000000..075cc9ddf7f
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/ExportJobManager.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.PreDestroy;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.utils.Archiver;
+import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
+import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.ozone.recon.api.types.ExportJob;
+import org.apache.hadoop.ozone.recon.api.types.ExportJob.JobStatus;
+import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
+import org.apache.ozone.recon.schema.ContainerSchemaDefinition;
+import
org.apache.ozone.recon.schema.generated.tables.records.UnhealthyContainersRecord;
+import org.jooq.Cursor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages asynchronous CSV export jobs.
+ */
+@Singleton
+public class ExportJobManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(ExportJobManager.class);
+
+ private final Map<String, ExportJob> jobTracker = new ConcurrentHashMap<>();
+ private final LinkedHashMap<String, ExportJob> jobQueue = new
LinkedHashMap<>();
+ private final Map<String, Future<?>> runningTasks = new
ConcurrentHashMap<>();
+ private final ExecutorService workerPool;
+ private final ContainerHealthSchemaManager containerHealthSchemaManager;
+ private final String exportDirectory;
+ private final int maxDownloads;
+ private final int maxQueueSize;
+
+ @Inject
+ public ExportJobManager(ContainerHealthSchemaManager
containerHealthSchemaManager,
+ OzoneConfiguration conf) {
+ this.containerHealthSchemaManager = containerHealthSchemaManager;
+
+ // Use single thread executor for sequential processing (no concurrent DB
access)
+ this.workerPool = Executors.newSingleThreadExecutor();
+
+ // Resolve export directory: use configured value if set, otherwise fall
back to
+ // {ozone.recon.db.dir}/exports so exports survive OS restarts alongside
Recon data
+ String configuredDir =
conf.get(ReconServerConfigKeys.OZONE_RECON_EXPORT_DIRECTORY,
+ ReconServerConfigKeys.OZONE_RECON_EXPORT_DIRECTORY_DEFAULT);
+ if (configuredDir == null || configuredDir.isEmpty()) {
+ File reconDbDir = new ReconUtils().getReconDbDir(
+ conf, ReconServerConfigKeys.OZONE_RECON_DB_DIR);
+ configuredDir = new File(reconDbDir, "exports").getAbsolutePath();
+ }
+ this.exportDirectory = configuredDir;
+ this.maxDownloads = conf.getInt(
+ ReconServerConfigKeys.OZONE_RECON_EXPORT_MAX_DOWNLOADS,
+ ReconServerConfigKeys.OZONE_RECON_EXPORT_MAX_DOWNLOADS_DEFAULT);
+ this.maxQueueSize = conf.getInt(
+ ReconServerConfigKeys.OZONE_RECON_EXPORT_MAX_JOBS_TOTAL,
+ ReconServerConfigKeys.OZONE_RECON_EXPORT_MAX_JOBS_TOTAL_DEFAULT);
+
+ // Create export directory if it doesn't exist
+ try {
+ Files.createDirectories(Paths.get(exportDirectory));
+ } catch (IOException e) {
+ LOG.error("Failed to create export directory: {}", exportDirectory, e);
+ }
+
+ // Clean any leftover TARs / working dirs from a previous run so disk
+ // is bounded by what was started in the current Recon process.
+ File dir = new File(exportDirectory);
+ File[] entries = dir.listFiles();
+ int removed = 0;
+ if (entries != null) {
+ for (File entry : entries) {
+ if (entry.isDirectory()) {
+ FileUtils.deleteQuietly(entry);
+ } else if (entry.getName().endsWith(".tar")) {
+ FileUtils.deleteQuietly(entry);
+ } else {
+ continue;
+ }
+ removed++;
+ }
+ }
+ if (removed > 0) {
+ LOG.info("Startup cleanup: removed {} leftover export artifact(s) from
{}",
+ removed, exportDirectory);
+ }
+
+ LOG.info("ExportJobManager initialized with single-threaded queue (max {}
jobs)", maxQueueSize);
+ }
+
+ public String submitJob(String state) {
+ String jobId = UUID.randomUUID().toString();
+ ExportJob job = new ExportJob(jobId, state, maxDownloads);
+ String filePath = exportDirectory + "/export_" + state.toLowerCase()
+ + "_" + System.currentTimeMillis() + ".tar";
+ job.setFilePath(filePath);
+
+ int queuePosition;
+ // Single lock for all queue-related checks and mutations to avoid nested.
+ synchronized (jobQueue) {
+ // Reject if a job for this state is already queued, running, or
completed
+ boolean stateAlreadyExists = jobTracker.values().stream().anyMatch(
+ j -> j.getState().equals(state)
+ && (j.getStatus() == JobStatus.QUEUED
+ || j.getStatus() == JobStatus.RUNNING
+ || j.getStatus() == JobStatus.COMPLETED));
+ if (stateAlreadyExists) {
+ throw new IllegalStateException(
+ "An export for state " + state + " already exists. Please delete
the existing export "
+ + "from the Completed Exports table before starting a new
one.");
+ }
+
+ if (jobQueue.size() >= maxQueueSize) {
+ throw new IllegalStateException(
+ "Export queue is full (max " + maxQueueSize + " jobs). Please try
again later.");
+ }
+
+ jobTracker.put(jobId, job);
+ jobQueue.put(jobId, job);
+ queuePosition = jobQueue.size();
+ }
+
+ // Submit outside the lock — workerPool.submit is thread-safe on its own
+ Future<?> future = workerPool.submit(() -> executeExport(job));
+ runningTasks.put(jobId, future);
+
+ LOG.info("Submitted export job {} (state={}, queue position={})", jobId,
state, queuePosition);
+
+ return jobId;
+ }
+
+ public ExportJob getJob(String jobId) {
+ return jobTracker.get(jobId);
+ }
+
+ /**
+ * Returns all tracked export jobs (any status).
+ */
+ public List<ExportJob> getAllJobs() {
+ return new ArrayList<>(jobTracker.values());
+ }
+
+ /**
+ * Get the queue position for a job (1-indexed).
+ * Returns 0 if job is not in queue (running, completed, or not found).
+ */
+ public int getQueuePosition(String jobId) {
+ synchronized (jobQueue) {
+ if (!jobQueue.containsKey(jobId)) {
+ return 0;
+ }
+
+ int position = 1;
+ for (String id : jobQueue.keySet()) {
+ if (id.equals(jobId)) {
+ return position;
+ }
+ position++;
+ }
+ return 0;
+ }
+ }
+
+ /**
+ * cancelJob is a unified cleanup method
+ * Cancel a QUEUED or RUNNING job, or delete a COMPLETED/FAILED job and its
TAR file.
+ * Removes the job from the tracker in all cases.
+ */
+ public void cancelJob(String jobId) {
+ ExportJob job = jobTracker.get(jobId);
+ if (job == null) {
+ throw new IllegalStateException("Job not found: " + jobId);
+ }
+
+ if (job.getStatus() == JobStatus.QUEUED || job.getStatus() ==
JobStatus.RUNNING) {
+ // Remove from queue if still waiting
+ synchronized (jobQueue) {
+ jobQueue.remove(jobId);
+ }
+ Future<?> future = runningTasks.remove(jobId);
+ if (future != null) {
+ future.cancel(true);
+ }
+ job.setStatus(JobStatus.FAILED);
+ job.setErrorMessage("Cancelled by user");
+ // Clean up any partial temp directory
+ FileUtils.deleteQuietly(new File(exportDirectory + "/" + jobId));
+ }
+
+ // Delete the TAR file outside the lock — file I/O does not need
synchronization
+ if (job.getFilePath() != null) {
+ FileUtils.deleteQuietly(new File(job.getFilePath()));
+ }
+
+ // Remove from both maps atomically so submitJob's duplicate-state check
+ // (which also runs inside synchronized(jobQueue)) never sees a
half-removed job
+ synchronized (jobQueue) {
+ jobQueue.remove(jobId); // no-op for COMPLETED/FAILED jobs already off
the queue
+ jobTracker.remove(jobId);
+ }
+
+ LOG.info("Deleted export job {} file={} (was {})", jobId,
job.getFileName(), job.getStatus());
+ }
+
+ private void executeExport(ExportJob job) {
+ String jobDirectory = exportDirectory + "/" + job.getJobId();
+ Path jobDir = Paths.get(jobDirectory);
+ String tarFilePath = job.getFilePath(); // Use the filename set in
submitJob
+
+ try {
+ // Create job-specific directory for CSV files
+ Files.createDirectories(jobDir);
+
+ // Remove from queue and mark as running
+ synchronized (jobQueue) {
+ jobQueue.remove(job.getJobId());
+ }
+ job.setStatus(JobStatus.RUNNING);
+ LOG.info("Starting export job {}", job.getJobId());
+
+ ContainerSchemaDefinition.UnHealthyContainerStates internalState =
+
ContainerSchemaDefinition.UnHealthyContainerStates.valueOf(job.getState());
+
+ // Get total count first for progress tracking
+ long estimatedTotal =
containerHealthSchemaManager.getUnhealthyContainersCount(internalState, -1, 0);
+ job.setEstimatedTotal(estimatedTotal);
+ LOG.info("Export job {} will process approximately {} records",
job.getJobId(), estimatedTotal);
+
+ // Open database cursor (-1 = unlimited, 0 = no prevKey offset)
+ try (Cursor<UnhealthyContainersRecord> cursor =
+
containerHealthSchemaManager.getUnhealthyContainersCursor(internalState, -1,
0)) {
+ int fileIndex = 1;
+ long totalRecords = 0;
+ long recordsInCurrentFile = 0;
+ final int recordsPerFile = 500_000;
+
+ BufferedWriter writer = null;
+ OutputStream fos = null;
+ try {
+ while (cursor.hasNext()) {
+ // Check for cancellation
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException("Job cancelled");
+ }
+
+ // Start new CSV file if needed
+ if (recordsInCurrentFile == 0) {
+ // Close previous file if exists
+ if (writer != null) {
+ writer.flush();
+ writer.close();
+ }
+
+ String csvFileName =
String.format("%s/unhealthy_containers_%s_part%03d.csv",
+ jobDirectory, job.getState().toLowerCase(), fileIndex);
+ fos = Files.newOutputStream(Paths.get(csvFileName));
+ try {
+ writer = new BufferedWriter(new OutputStreamWriter(fos,
StandardCharsets.UTF_8));
+ } finally {
+ if (writer == null) {
+ fos.close();
+ }
+ }
+
+ // Write CSV header
+ writer.write("container_id,container_state,in_state_since," +
+
"expected_replica_count,actual_replica_count,replica_delta\n");
+
+ LOG.info("Created CSV file: part{}", fileIndex);
+ }
+
+ // Fetch and write record
+ UnhealthyContainersRecord rec = cursor.fetchNext();
+ StringBuilder sb = new StringBuilder(128);
+ sb.append(rec.getContainerId()).append(',')
+ .append(rec.getContainerState()).append(',')
+ .append(rec.getInStateSince()).append(',')
+ .append(rec.getExpectedReplicaCount()).append(',')
+ .append(rec.getActualReplicaCount()).append(',')
+ .append(rec.getReplicaDelta()).append('\n');
+ writer.write(sb.toString());
+
+ totalRecords++;
+ recordsInCurrentFile++;
+ job.setTotalRecords(totalRecords);
+
+ // Move to next file if per-file record limit reached
+ if (recordsInCurrentFile >= recordsPerFile) {
+ writer.flush();
+ writer.close();
+ writer = null;
+ recordsInCurrentFile = 0;
+ fileIndex++;
+ }
+
+ // Flush every 10K rows
+ if (recordsInCurrentFile > 0 && recordsInCurrentFile % 10000 == 0)
{
+ writer.flush();
+ }
+ }
+
+ // Close last file
+ if (writer != null) {
+ writer.flush();
+ writer.close();
+ }
+
+ } finally {
+ if (writer != null) {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ LOG.warn("Error closing writer", e);
+ }
+ }
+ }
+
+ LOG.info("Export job {} wrote {} records across {} files",
+ job.getJobId(), totalRecords, fileIndex);
+
+ // Create TAR archive
+ File tarFile = new File(tarFilePath);
+ Archiver.create(tarFile, jobDir);
+ LOG.info("Created TAR archive: {}", tarFilePath);
+
+ // Delete CSV files and job directory
+ FileUtils.deleteDirectory(jobDir.toFile());
+ LOG.info("Deleted temporary CSV files for job {}", job.getJobId());
+
+ // Update job with TAR file path
+ job.setFilePath(tarFilePath);
+ job.setStatus(JobStatus.COMPLETED);
+ LOG.info("Completed export job {} ({} records)", job.getJobId(),
totalRecords);
+
+ } catch (InterruptedException e) {
+ job.setStatus(JobStatus.FAILED);
+ job.setErrorMessage("Job was cancelled");
+ FileUtils.deleteQuietly(jobDir.toFile());
+ FileUtils.deleteQuietly(new File(tarFilePath));
+ LOG.info("Export job {} was cancelled", job.getJobId());
+ Thread.currentThread().interrupt();
+ }
+
+ } catch (IOException | RuntimeException e) {
+ job.setStatus(JobStatus.FAILED);
+ job.setErrorMessage(e.getMessage());
+ FileUtils.deleteQuietly(new File(exportDirectory + "/" +
job.getJobId()));
+ FileUtils.deleteQuietly(new File(tarFilePath));
+ LOG.error("Export job {} failed", job.getJobId(), e);
+ } finally {
+ runningTasks.remove(job.getJobId());
+ }
+ }
+
+ @PreDestroy
+ public void shutdown() {
+ LOG.info("Shutting down ExportJobManager");
+ workerPool.shutdownNow();
+ try {
+ workerPool.awaitTermination(30, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ LOG.warn("Timeout waiting for executor shutdown", e);
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ExportJob.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ExportJob.java
new file mode 100644
index 00000000000..005533c61fd
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/api/types/ExportJob.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api.types;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.nio.file.Paths;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Represents an asynchronous CSV export job.
+ */
+public class ExportJob {
+ @JsonProperty("jobId")
+ private String jobId;
+
+ @JsonProperty("state")
+ private String state;
+
+ @JsonProperty("status")
+ private JobStatus status;
+
+ @JsonProperty("submittedAt")
+ private long submittedAt;
+
+ @JsonProperty("startedAt")
+ private long startedAt;
+
+ @JsonProperty("completedAt")
+ private long completedAt;
+
+ @JsonProperty("totalRecords")
+ private long totalRecords;
+
+ @JsonProperty("estimatedTotal")
+ private long estimatedTotal;
+
+ // Full path is kept internally for file I/O; only the filename is exposed
via JSON
+ private String filePath;
+
+ @JsonProperty("fileName")
+ private String fileName;
+
+ @JsonProperty("errorMessage")
+ private String errorMessage;
+
+ @JsonProperty("progressPercent")
+ private int progressPercent;
+
+ @JsonProperty("queuePosition")
+ private int queuePosition;
+
+ // Internal — not serialized
+ private int maxDownloads;
+
+ @JsonIgnore
+ private final AtomicInteger downloadCount = new AtomicInteger(0);
+
+ public ExportJob(String jobId, String state, int maxDownloads) {
+ this.jobId = jobId;
+ this.state = state;
+ this.status = JobStatus.QUEUED;
+ this.submittedAt = System.currentTimeMillis();
+ this.totalRecords = 0;
+ this.estimatedTotal = -1;
+ this.maxDownloads = maxDownloads;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public JobStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(JobStatus status) {
+ this.status = status;
+ if (status == JobStatus.RUNNING && startedAt == 0) {
+ startedAt = System.currentTimeMillis();
+ } else if ((status == JobStatus.COMPLETED || status == JobStatus.FAILED)
&& completedAt == 0) {
+ completedAt = System.currentTimeMillis();
+ }
+ }
+
+ public long getSubmittedAt() {
+ return submittedAt;
+ }
+
+ public long getStartedAt() {
+ return startedAt;
+ }
+
+ public long getCompletedAt() {
+ return completedAt;
+ }
+
+ public long getTotalRecords() {
+ return totalRecords;
+ }
+
+ public void setTotalRecords(long totalRecords) {
+ this.totalRecords = totalRecords;
+ }
+
+ public long getEstimatedTotal() {
+ return estimatedTotal;
+ }
+
+ public void setEstimatedTotal(long estimatedTotal) {
+ this.estimatedTotal = estimatedTotal;
+ }
+
+ public String getFilePath() {
+ return filePath;
+ }
+
+ public void setFilePath(String filePath) {
+ this.filePath = filePath;
+ if (filePath == null) {
+ this.fileName = null;
+ return;
+ }
+ java.nio.file.Path path = Paths.get(filePath).getFileName();
+ this.fileName = path != null ? path.toString() : filePath;
+ }
+
+ public String getFileName() {
+ return fileName;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public void setErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+
+ public int getProgressPercent() {
+ if (estimatedTotal > 0 && totalRecords > 0) {
+ return (int) ((totalRecords * 100) / estimatedTotal);
+ }
+ return 0;
+ }
+
+ public int getQueuePosition() {
+ return queuePosition;
+ }
+
+ public void setQueuePosition(int queuePosition) {
+ this.queuePosition = queuePosition;
+ }
+
+ @JsonProperty("downloadCount")
+ public int getDownloadCount() {
+ return downloadCount.get();
+ }
+
+ public int getMaxDownloads() {
+ return maxDownloads;
+ }
+
+ @JsonProperty("downloadsRemaining")
+ public int getDownloadsRemaining() {
+ return Math.max(0, maxDownloads - downloadCount.get());
+ }
+
+ /**
+ * Best-effort hint for UI; may be briefly stale vs {@link
#tryReserveDownload()}.
+ */
+ public boolean isDownloadAllowed() {
+ return downloadCount.get() < maxDownloads;
+ }
+
+ /**
+ * Atomically consumes one download slot if any remain. Use this from the
+ * download endpoint so concurrent requests cannot bypass {@code
maxDownloads}.
+ *
+ * @return true if a slot was reserved, false if the limit was already
reached
+ */
+ public boolean tryReserveDownload() {
+ while (true) {
+ int current = downloadCount.get();
+ if (current >= maxDownloads) {
+ return false;
+ }
+ if (downloadCount.compareAndSet(current, current + 1)) {
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Current execution state of the export job.
+ */
+ public enum JobStatus {
+ QUEUED, // Waiting for worker thread
+ RUNNING, // Actively exporting
+ COMPLETED, // File ready for download
+ FAILED // Error occurred
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java
index ac1e91350cc..64ce9495ad9 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/persistence/ContainerHealthSchemaManager.java
@@ -32,10 +32,13 @@
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
import org.apache.ozone.recon.schema.ContainerSchemaDefinition;
import
org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
import
org.apache.ozone.recon.schema.generated.tables.records.UnhealthyContainersRecord;
import org.jooq.Condition;
+import org.jooq.Cursor;
import org.jooq.DSLContext;
import org.jooq.OrderField;
import org.jooq.Record;
@@ -67,11 +70,16 @@ public class ContainerHealthSchemaManager {
static final int MAX_DELETE_CHUNK_SIZE = 1_000;
private final ContainerSchemaDefinition containerSchemaDefinition;
+ private final int unhealthyContainersFetchSize;
@Inject
public ContainerHealthSchemaManager(
- ContainerSchemaDefinition containerSchemaDefinition) {
+ ContainerSchemaDefinition containerSchemaDefinition,
+ OzoneConfiguration conf) {
this.containerSchemaDefinition = containerSchemaDefinition;
+ this.unhealthyContainersFetchSize = conf.getInt(
+ ReconServerConfigKeys.OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE,
+
ReconServerConfigKeys.OZONE_RECON_UNHEALTHY_CONTAINER_FETCH_SIZE_DEFAULT);
}
/**
@@ -395,6 +403,91 @@ public void clearAllUnhealthyContainerRecords() {
}
}
+ /**
+ * Returns the count of unhealthy containers matching the given state.
+ *
+ * <p>A full {@code SELECT COUNT(*)} is always executed against Derby.
+ * The {@code limit} parameter does not restrict the DB query — it only
+ * caps the returned value so the UI can display a bounded estimated
total.</p>
+ *
+ * @param state the container health state to filter by (required)
+ * @param limit if greater than 0 and less than the real count, this value
+ * is returned instead of the real count; pass -1 to always
+ * return the actual count
+ * @param prevKey if greater than 0, only containers with
+ * {@code container_id > prevKey} are included in the count
+ * @return the count of matching containers, capped at {@code limit} if
applicable
+ */
+ public long getUnhealthyContainersCount(
+ UnHealthyContainerStates state, int limit, long prevKey) {
+ DSLContext dslContext = containerSchemaDefinition.getDSLContext();
+
+ Condition whereCondition =
UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString());
+
+ if (prevKey > 0) {
+ whereCondition =
whereCondition.and(UNHEALTHY_CONTAINERS.CONTAINER_ID.gt(prevKey));
+ }
+
+ long totalCount = dslContext.selectCount()
+ .from(UNHEALTHY_CONTAINERS)
+ .where(whereCondition)
+ .fetchOne(0, long.class);
+
+ // If limit is set and less than total, return the limit as estimated total
+ if (limit > 0 && limit < totalCount) {
+ return limit;
+ }
+
+ return totalCount;
+ }
+
+ /**
+ * Returns a streaming cursor over unhealthy container records for a given
state.
+ * Caller MUST close the cursor.
+ *
+ * <p>Generated SQL example (50,000 MISSING containers, starting after
container ID 12345):</p>
+ *
+ * <pre>
+ * SELECT * FROM unhealthy_containers
+ * WHERE container_state = 'MISSING'
+ * AND container_id > 12345
+ * ORDER BY container_id ASC
+ * LIMIT 50000
+ * </pre>
+ *
+ * @param state filter by state (required)
+ * @param limit max records to return, -1 = unlimited
+ * @param prevKey previous container ID to skip, for cursor-based pagination
+ * @return Cursor returning UnhealthyContainersRecord
+ */
+ public Cursor<UnhealthyContainersRecord> getUnhealthyContainersCursor(
+ UnHealthyContainerStates state, int limit, long prevKey) {
+ DSLContext dslContext = containerSchemaDefinition.getDSLContext();
+ SelectQuery<UnhealthyContainersRecord> query =
dslContext.selectFrom(UNHEALTHY_CONTAINERS).getQuery();
+
+ // WHERE container_state = ?
+
query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_STATE.eq(state.toString()));
+
+ if (prevKey > 0) {
+ // AND container_id > ? (cursor-based pagination)
+ query.addConditions(UNHEALTHY_CONTAINERS.CONTAINER_ID.gt(prevKey));
+ }
+
+ // ORDER BY container_id ASC — matches composite index (state,
container_id),
+ // so Derby walks it in order with no sort step.
+ query.addOrderBy(UNHEALTHY_CONTAINERS.CONTAINER_ID.asc());
+
+ if (limit > 0) {
+ query.addLimit(limit);
+ }
+
+ // Controls how many rows Derby returns per JDBC round-trip.
+ // Configurable via ozone.recon.unhealthy.container.fetch.size (default
10,000).
+ query.fetchSize(this.unhealthyContainersFetchSize);
+
+ return query.fetchLazy();
+ }
+
/**
* POJO representing a record in UNHEALTHY_CONTAINERS table.
*/
diff --git
a/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/containers/containers.tsx
b/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/containers/containers.tsx
index 2b1ca3d2499..6c920dfba8d 100644
---
a/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/containers/containers.tsx
+++
b/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/pages/containers/containers.tsx
@@ -16,10 +16,23 @@
* limitations under the License.
*/
-import React, { useState, useEffect } from "react";
+import React, { useState, useEffect, useRef } from "react";
import moment from "moment";
-import { Card, Row, Tabs } from "antd";
+import {
+ Button,
+ Card,
+ message,
+ Progress,
+ Row,
+ Select,
+ Table,
+ Tag,
+ Tabs,
+ Tooltip,
+} from "antd";
+import { DeleteOutlined, DownloadOutlined, ExportOutlined } from
"@ant-design/icons";
import { ValueType } from "react-select/src/types";
+import { ColumnsType } from "antd/es/table";
import Search from "@/v2/components/search/search";
import MultiSelect, { Option } from "@/v2/components/select/multiSelect";
@@ -35,6 +48,7 @@ import {
ContainersPaginationResponse,
ContainerState,
ExpandedRow,
+ ExportJob,
TabPaginationState,
} from "@/v2/types/container.types";
import { ClusterStateResponse } from "@/v2/types/overview.types";
@@ -52,6 +66,14 @@ const TAB_STATE_MAP: Record<string, string> = {
'5': 'REPLICA_MISMATCH',
};
+const EXPORT_STATE_OPTIONS = [
+ { label: 'Missing', value: 'MISSING' },
+ { label: 'Under-Replicated', value: 'UNDER_REPLICATED' },
+ { label: 'Over-Replicated', value: 'OVER_REPLICATED' },
+ { label: 'Mis-Replicated', value: 'MIS_REPLICATED' },
+ { label: 'Replica Mismatch', value: 'REPLICA_MISMATCH' },
+];
+
const SearchableColumnOpts = [{
label: 'Container ID',
value: 'containerID'
@@ -75,6 +97,8 @@ const DEFAULT_TAB_STATE: TabPaginationState = {
hasNextPage: false,
};
+const POLL_INTERVAL_MS = 3000;
+
const Containers: React.FC<{}> = () => {
const [state, setState] = useState<ContainerState>({
lastUpdated: 0,
@@ -100,6 +124,12 @@ const Containers: React.FC<{}> = () => {
const [selectedTab, setSelectedTab] = useState<string>('1');
const [searchColumn, setSearchColumn] = useState<'containerID' |
'pipelineID'>('containerID');
+ // Export tab state
+ const [exportJobs, setExportJobs] = useState<ExportJob[]>([]);
+ const [selectedExportState, setSelectedExportState] =
useState<string>('MISSING');
+ const [exportSubmitting, setExportSubmitting] = useState<boolean>(false);
+ const pollTimerRef = useRef<ReturnType<typeof setInterval> | null>(null);
+
const debouncedSearch = useDebounce(searchTerm, 300);
const clusterState = useApiData<ClusterStateResponse>(
@@ -121,17 +151,134 @@ const Containers: React.FC<{}> = () => {
}
}, [clusterState.data]);
- // Fetch a single page for a tab using cursor-based pagination.
- // minContainerId=0 means "start from the beginning".
- // currentPageSize is passed explicitly so callers (e.g. size-change
handler) can
- // provide the new value before React state has updated.
+ // ── Polling ──────────────────────────────────────────────────────────────
+ const fetchExportJobs = async () => {
+ try {
+ const jobs = await fetchData<ExportJob[]>(
+ '/api/v1/containers/unhealthy/export'
+ );
+ setExportJobs(jobs ?? []);
+ // Stop polling when no active jobs remain
+ const hasActive = (jobs ?? []).some(
+ j => j.status === 'QUEUED' || j.status === 'RUNNING'
+ );
+ if (!hasActive && pollTimerRef.current) {
+ clearInterval(pollTimerRef.current);
+ pollTimerRef.current = null;
+ }
+ } catch (err) {
+ // Silent — polling errors shouldn't break the UI
+ }
+ };
+
+ const startPolling = () => {
+ if (pollTimerRef.current) return; // already polling
+ fetchExportJobs(); // immediate fetch
+ pollTimerRef.current = setInterval(fetchExportJobs, POLL_INTERVAL_MS);
+ };
+
+ // Start polling when Export tab is active; stop when leaving if no active
jobs.
+ useEffect(() => {
+ if (selectedTab === '6') {
+ startPolling();
+ } else {
+ const hasActive = exportJobs.some(
+ j => j.status === 'QUEUED' || j.status === 'RUNNING'
+ );
+ if (!hasActive && pollTimerRef.current) {
+ clearInterval(pollTimerRef.current);
+ pollTimerRef.current = null;
+ }
+ }
+ return () => {
+ // Do NOT clear on unmount if active jobs exist; React StrictMode
+ // can remount, so we guard with hasActive inside the interval callback.
+ };
+ }, [selectedTab]); // eslint-disable-line react-hooks/exhaustive-deps
+
+ // Clear on component unmount
+ useEffect(() => {
+ return () => {
+ if (pollTimerRef.current) {
+ clearInterval(pollTimerRef.current);
+ }
+ };
+ }, []);
+
+ // ── Export submit ─────────────────────────────────────────────────────────
+ const handleSubmitExport = async () => {
+ // Guard against race condition where exportJobs state may be stale
+ if (exportJobs.some(
+ j => j.state === selectedExportState
+ && (j.status === 'QUEUED' || j.status === 'RUNNING' || j.status ===
'COMPLETED')
+ )) {
+ message.warning(
+ `A ${selectedExportState} export already exists. Delete it from the
Completed Exports table to start a new one.`,
+ );
+ return;
+ }
+ setExportSubmitting(true);
+ try {
+ const response = await fetch(
+ `/api/v1/containers/unhealthy/export?state=${selectedExportState}`,
+ { method: 'POST' }
+ );
+ if (!response.ok) {
+ let errorMsg = `Failed to start export (HTTP ${response.status})`;
+ try {
+ const body = await response.json();
+ errorMsg = body.message || body.error || errorMsg;
+ } catch {
+ const text = await response.text();
+ if (text && !text.includes('<html>')) errorMsg = text;
+ }
+ // Use a longer duration for queue-full errors so the user has time to
read it
+ const duration = response.status === 429 ? 6 : 4;
+ message.error({ content: errorMsg, duration });
+ return;
+ }
+ await fetchExportJobs();
+ startPolling();
+ message.success({ content: 'Export job submitted. Track progress in the
table below.', duration: 3 });
+ } catch (err: any) {
+ message.error({ content: `Export failed: ${err.message || err}`,
duration: 4 });
+ } finally {
+ setExportSubmitting(false);
+ }
+ };
+
+ // ── Download helper ───────────────────────────────────────────────────────
+ // Uses a hidden <a> so the browser streams the TAR directly to disk
+ // (no in-memory buffering — important for multi-GB exports).
+ const downloadFile = (jobId: string) => {
+ const link = document.createElement('a');
+ link.href = `/api/v1/containers/unhealthy/export/${jobId}/download`;
+ document.body.appendChild(link);
+ link.click();
+ document.body.removeChild(link);
+ // Backend has already incremented downloadCount; refresh so the UI
reflects
+ // the new downloadsRemaining value without waiting for the next poll tick.
+ setTimeout(() => fetchExportJobs(), 500);
+ };
+
+ // ── Delete job helper ─────────────────────────────────────────────────────
+ const deleteJob = async (jobId: string) => {
+ try {
+ await fetch(`/api/v1/containers/unhealthy/export/${jobId}`, { method:
'DELETE' });
+ fetchExportJobs();
+ } catch (err: any) {
+ message.error({ content: `Delete failed: ${err.message || err}`,
duration: 4 });
+ }
+ };
+
+ // ── Container data fetching ───────────────────────────────────────────────
const fetchTabData = async (
tabKey: string,
minContainerId: number,
currentPageSize: number
) => {
const containerStateName = TAB_STATE_MAP[tabKey];
- // Fetch one extra item so we can detect a next page without a separate
count request.
+ if (!containerStateName) return; // skip Export tab (key='6') or unknown
keys
const fetchSize = currentPageSize + 1;
setTabStates(prev => ({
@@ -147,12 +294,8 @@ const Containers: React.FC<{}> = () => {
);
const allContainers = response.containers ?? [];
- // If we received more than currentPageSize items, a next page exists.
const hasNextPage = allContainers.length > currentPageSize;
- // Always display at most currentPageSize rows.
const containers = allContainers.slice(0, currentPageSize);
- // Derive cursor keys from the visible slice, not the full response,
- // so the next-page request starts exactly after the last displayed row.
const lastKey = containers.length > 0
? Math.max(...containers.map(c => c.containerID))
: 0;
@@ -173,7 +316,6 @@ const Containers: React.FC<{}> = () => {
},
}));
- // Summary counts are returned by every tab endpoint.
setState(prev => ({
...prev,
missingCount: response.missingCount ?? 0,
@@ -192,7 +334,6 @@ const Containers: React.FC<{}> = () => {
}
};
- // Initial fetch on mount.
useEffect(() => {
fetchTabData('1', 0, DEFAULT_PAGE_SIZE);
}, []); // eslint-disable-line react-hooks/exhaustive-deps
@@ -203,8 +344,7 @@ const Containers: React.FC<{}> = () => {
function handleTabChange(key: string) {
setSelectedTab(key);
- // Lazy-load: fetch first page only if the tab has never been loaded.
- if (tabStates[key].data.length === 0 && !tabStates[key].loading) {
+ if (key !== '6' && tabStates[key]?.data.length === 0 &&
!tabStates[key]?.loading) {
fetchTabData(key, 0, pageSize);
}
}
@@ -212,8 +352,6 @@ const Containers: React.FC<{}> = () => {
function handleNextPage(tabKey: string) {
const tab = tabStates[tabKey];
if (tab.loading || !tab.hasNextPage) return;
-
- // Push the current minContainerId so we can navigate back.
setTabStates(prev => ({
...prev,
[tabKey]: {
@@ -227,10 +365,8 @@ const Containers: React.FC<{}> = () => {
function handlePrevPage(tabKey: string) {
const tab = tabStates[tabKey];
if (tab.loading || tab.pageHistory.length === 0) return;
-
const history = [...tab.pageHistory];
const prevMinContainerId = history.pop() ?? 0;
-
setTabStates(prev => ({
...prev,
[tabKey]: { ...prev[tabKey], pageHistory: history },
@@ -238,7 +374,6 @@ const Containers: React.FC<{}> = () => {
fetchTabData(tabKey, prevMinContainerId, pageSize);
}
- // Changing page size resets all tabs and re-fetches the active tab from
page 1.
function handlePageSizeChange(newSize: number) {
setPageSize(newSize);
const reset = {
@@ -252,7 +387,6 @@ const Containers: React.FC<{}> = () => {
fetchTabData(selectedTab, 0, newSize);
}
- // Full refresh: reset all tab states and re-fetch the active tab from page
1.
const loadContainersData = () => {
setTabStates({
'1': { ...DEFAULT_TAB_STATE },
@@ -278,14 +412,176 @@ const Containers: React.FC<{}> = () => {
replicaMismatchCount,
} = state;
- const currentTabState = tabStates[selectedTab];
+ const currentTabState = tabStates[selectedTab] ?? DEFAULT_TAB_STATE;
+
+ // ── Export jobs table helpers ─────────────────────────────────────────────
+ const activeJobs = exportJobs.filter(j => j.status === 'RUNNING' || j.status
=== 'QUEUED');
+ const completedJobs = exportJobs.filter(j => j.status === 'COMPLETED' ||
j.status === 'FAILED');
+ const isStateAlreadyActive = exportJobs.some(
+ j => j.state === selectedExportState
+ && (j.status === 'QUEUED' || j.status === 'RUNNING' || j.status ===
'COMPLETED')
+ );
+
+ const statusColor: Record<string, string> = {
+ QUEUED: 'blue',
+ RUNNING: 'processing',
+ COMPLETED: 'green',
+ FAILED: 'red',
+ };
+
+ const jobIdColumn: ColumnsType<ExportJob>[0] = {
+ title: 'Job ID',
+ dataIndex: 'jobId',
+ key: 'jobId',
+ width: 110,
+ render: (id: string) => (
+ <Tooltip title={id}>
+ <code>{id.substring(0, 8)}</code>
+ </Tooltip>
+ ),
+ };
+
+ const stateColumn: ColumnsType<ExportJob>[0] = {
+ title: 'State',
+ dataIndex: 'state',
+ key: 'state',
+ render: (s: string) => s.replace(/_/g, ' '),
+ };
+
+ const statusColumn: ColumnsType<ExportJob>[0] = {
+ title: 'Status',
+ dataIndex: 'status',
+ key: 'status',
+ width: 120,
+ render: (status: string) => (
+ <Tag color={statusColor[status] ?? 'default'}>{status}</Tag>
+ ),
+ };
+
+ const submittedColumn: ColumnsType<ExportJob>[0] = {
+ title: 'Submitted',
+ dataIndex: 'submittedAt',
+ key: 'submittedAt',
+ render: (ts: number) => ts ? moment(ts).format('MMM D, HH:mm:ss') : '—',
+ };
+
+ const startedColumn: ColumnsType<ExportJob>[0] = {
+ title: 'Started',
+ dataIndex: 'startedAt',
+ key: 'startedAt',
+ render: (ts: number) => ts ? moment(ts).format('MMM D, HH:mm:ss') : '—',
+ };
+
+ // ── Active exports columns (RUNNING / QUEUED) ─────────────────────────────
+ const activeExportColumns: ColumnsType<ExportJob> = [
+ jobIdColumn,
+ stateColumn,
+ statusColumn,
+ {
+ title: 'Queue Position',
+ dataIndex: 'queuePosition',
+ key: 'queuePosition',
+ width: 130,
+ render: (_: number, record: ExportJob) =>
+ record.status === 'QUEUED' && record.queuePosition > 0
+ ? `#${record.queuePosition}`
+ : '—',
+ },
+ submittedColumn,
+ startedColumn,
+ {
+ title: 'Progress',
+ key: 'progress',
+ render: (_: unknown, record: ExportJob) => {
+ if (record.status === 'RUNNING') {
+ const pct = record.progressPercent || 0;
+ const processed = record.totalRecords?.toLocaleString() ?? '0';
+ const total = record.estimatedTotal > 0
+ ? record.estimatedTotal.toLocaleString()
+ : '?';
+ return (
+ <div style={{ minWidth: 160 }}>
+ <Progress percent={pct} size='small' />
+ <div style={{ fontSize: 11, color: '#888', marginTop: 2 }}>
+ {processed} / {total} records
+ </div>
+ </div>
+ );
+ }
+ return '—';
+ },
+ },
+ ];
+
+ // ── Completed exports columns (COMPLETED / FAILED) ────────────────────────
+ const completedExportColumns: ColumnsType<ExportJob> = [
+ jobIdColumn,
+ stateColumn,
+ statusColumn,
+ {
+ title: 'Records',
+ dataIndex: 'totalRecords',
+ key: 'totalRecords',
+ render: (n: number, record: ExportJob) =>
+ record.status === 'COMPLETED' ? (n?.toLocaleString() ?? '—') : '—',
+ },
+ submittedColumn,
+ startedColumn,
+ {
+ title: 'Completed',
+ dataIndex: 'completedAt',
+ key: 'completedAt',
+ render: (ts: number) => ts ? moment(ts).format('MMM D, HH:mm:ss') : '—',
+ },
+ {
+ title: 'Action',
+ key: 'action',
+ render: (_: unknown, record: ExportJob) => {
+ const deleteBtn = (
+ <Button
+ danger
+ size='small'
+ icon={<DeleteOutlined />}
+ onClick={() => deleteJob(record.jobId)}>
+ Delete
+ </Button>
+ );
+ if (record.status === 'COMPLETED') {
+ const limitReached = record.downloadsRemaining === 0;
+ return (
+ <div style={{ display: 'flex', gap: 8 }}>
+ <Button
+ type='primary'
+ size='small'
+ icon={<DownloadOutlined />}
+ disabled={limitReached}
+ onClick={() => downloadFile(record.jobId)}>
+ {limitReached ? 'Limit reached' : `Download
(${record.downloadsRemaining} left)`}
+ </Button>
+ {deleteBtn}
+ </div>
+ );
+ }
+ if (record.status === 'FAILED') {
+ return (
+ <div style={{ display: 'flex', gap: 8 }}>
+ <Tooltip title={record.errorMessage ?? 'Unknown error'}>
+ <span style={{ color: '#ff4d4f', fontSize: 12, alignSelf:
'center' }}>
+ {record.errorMessage ?? 'Failed'}
+ </span>
+ </Tooltip>
+ {deleteBtn}
+ </div>
+ );
+ }
+ return null;
+ },
+ },
+ ];
+ // ── Highlights ────────────────────────────────────────────────────────────
const highlightData = (
- <div style={{
- display: 'flex',
- width: '90%',
- justifyContent: 'space-between'
- }}>
+ <div style={{ display: 'flex', width: '90%', justifyContent:
'space-between' }}>
<div className='highlight-content'>
Total Containers <br/>
<span className='highlight-content-value'>{totalContainers ??
'N/A'}</span>
@@ -329,44 +625,44 @@ const Containers: React.FC<{}> = () => {
<Card
title='Highlights'
loading={currentTabState.loading && missingCount === 0}>
- <Row
- align='middle'>
- {highlightData}
- </Row>
+ <Row align='middle'>{highlightData}</Row>
</Card>
</div>
<div className='content-div'>
- <div className='table-header-section'>
- <div className='table-filter-section'>
- <MultiSelect
- options={columnOptions}
- defaultValue={selectedColumns}
- selected={selectedColumns}
- placeholder='Columns'
- onChange={handleColumnChange}
- fixedColumn='containerID'
- onTagClose={() => { }}
- columnLength={columnOptions.length} />
- </div>
- <Search
- disabled={currentTabState.data.length === 0}
- searchOptions={SearchableColumnOpts}
- searchInput={searchTerm}
- searchColumn={searchColumn}
- onSearchChange={
- (e: React.ChangeEvent<HTMLInputElement>) =>
setSearchTerm(e.target.value)
- }
- onChange={(value) => {
- setSearchTerm('');
- setSearchColumn(value as 'containerID' | 'pipelineID');
- }} />
- </div>
- <Tabs defaultActiveKey='1'
+ <Tabs
+ defaultActiveKey='1'
onChange={(activeKey: string) => handleTabChange(activeKey)}>
+
+ {/* ── Container data tabs ─────────────────────────────────────
*/}
{(['1','2','3','4','5'] as const).map((key) => (
<Tabs.TabPane
key={key}
tab={['Missing','Under-Replicated','Over-Replicated','Mis-Replicated','Mismatched
Replicas'][Number(key)-1]}>
+ <div className='table-header-section'>
+ <div className='table-filter-section'>
+ <MultiSelect
+ options={columnOptions}
+ defaultValue={selectedColumns}
+ selected={selectedColumns}
+ placeholder='Columns'
+ onChange={handleColumnChange}
+ fixedColumn='containerID'
+ onTagClose={() => {}}
+ columnLength={columnOptions.length} />
+ </div>
+ <Search
+ disabled={tabStates[key].data.length === 0}
+ searchOptions={SearchableColumnOpts}
+ searchInput={searchTerm}
+ searchColumn={searchColumn}
+ onSearchChange={
+ (e: React.ChangeEvent<HTMLInputElement>) =>
setSearchTerm(e.target.value)
+ }
+ onChange={(value) => {
+ setSearchTerm('');
+ setSearchColumn(value as 'containerID' | 'pipelineID');
+ }} />
+ </div>
<ContainerTable
data={tabStates[key].data}
loading={tabStates[key].loading}
@@ -384,11 +680,79 @@ const Containers: React.FC<{}> = () => {
/>
</Tabs.TabPane>
))}
+
+ {/* ── Export tab ──────────────────────────────────────────────
*/}
+ <Tabs.TabPane
+ key='6'
+ tab={
+ <span>
+ <ExportOutlined />
+ Export
+ </span>
+ }>
+ <div style={{ marginBottom: 16, display: 'flex', alignItems:
'center', gap: 12 }}>
+ <span style={{ fontWeight: 500 }}>Container State:</span>
+ <Select
+ value={selectedExportState}
+ onChange={(v: string) => setSelectedExportState(v)}
+ options={EXPORT_STATE_OPTIONS}
+ style={{ width: 200 }} />
+ <Tooltip title={isStateAlreadyActive
+ ? `A ${selectedExportState} export already exists. Delete it
to start a new one.`
+ : ''}>
+ <Button
+ type='primary'
+ icon={<ExportOutlined />}
+ loading={exportSubmitting}
+ disabled={isStateAlreadyActive}
+ onClick={handleSubmitExport}>
+ Export CSV
+ </Button>
+ </Tooltip>
+ </div>
+
+ {/* Active Exports */}
+ {activeJobs.length > 0 && (
+ <div style={{ marginBottom: 24 }}>
+ <div style={{ fontWeight: 600, fontSize: 14, marginBottom: 8
}}>
+ Active Exports
+ </div>
+ <Table<ExportJob>
+ rowKey='jobId'
+ dataSource={activeJobs}
+ columns={activeExportColumns}
+ pagination={false}
+ size='middle'
+ locale={{ filterTitle: '' }}
+ />
+ </div>
+ )}
+
+ {/* Completed Exports */}
+ <div>
+ <div style={{ fontWeight: 600, fontSize: 14, marginBottom: 8
}}>
+ Completed Exports
+ </div>
+ <Table<ExportJob>
+ rowKey='jobId'
+ dataSource={completedJobs}
+ columns={completedExportColumns}
+ pagination={{ pageSize: 10, showSizeChanger: false }}
+ size='middle'
+ locale={{
+ emptyText: activeJobs.length === 0
+ ? 'No export jobs yet. Select a state and click Export
CSV.'
+ : 'No completed exports yet.',
+ filterTitle: '',
+ }}
+ />
+ </div>
+ </Tabs.TabPane>
</Tabs>
</div>
</div>
</>
);
-}
+};
export default Containers;
diff --git
a/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/types/container.types.ts
b/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/types/container.types.ts
index 3b51f36af18..f7461c913f2 100644
---
a/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/types/container.types.ts
+++
b/hadoop-ozone/recon/src/main/resources/webapps/recon/ozone-recon-web/src/v2/types/container.types.ts
@@ -121,4 +121,23 @@ export type ContainerState = {
overReplicatedCount: number;
misReplicatedCount: number;
replicaMismatchCount: number;
+}
+
+export type ExportJobStatus = 'QUEUED' | 'RUNNING' | 'COMPLETED' | 'FAILED';
+
+export type ExportJob = {
+ jobId: string;
+ state: string;
+ status: ExportJobStatus;
+ queuePosition: number;
+ totalRecords: number;
+ estimatedTotal: number;
+ progressPercent: number;
+ fileName: string | null;
+ errorMessage: string | null;
+ submittedAt: number;
+ startedAt: number;
+ completedAt: number;
+ downloadCount: number;
+ downloadsRemaining: number;
}
\ No newline at end of file
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestExportJobManager.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestExportJobManager.java
new file mode 100644
index 00000000000..6e60b5b6ba9
--- /dev/null
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestExportJobManager.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api;
+
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_EXPORT_DIRECTORY;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_EXPORT_MAX_DOWNLOADS;
+import static
org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_EXPORT_MAX_JOBS_TOTAL;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
+import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.recon.api.types.ExportJob;
+import org.apache.hadoop.ozone.recon.api.types.ExportJob.JobStatus;
+import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager;
+import
org.apache.ozone.recon.schema.ContainerSchemaDefinition.UnHealthyContainerStates;
+import
org.apache.ozone.recon.schema.generated.tables.records.UnhealthyContainersRecord;
+import org.jooq.Cursor;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * Unit tests for {@link ExportJobManager}.
+ *
+ * <p>The manager only depends on {@link ContainerHealthSchemaManager} for DB
+ * access; that collaborator is mocked so the tests run end-to-end through the
+ * single-thread worker without spinning up Derby or Guice.</p>
+ */
+class TestExportJobManager {
+
+ private static final String STATE_MISSING = "MISSING";
+ private static final String STATE_UNDER_REPLICATED = "UNDER_REPLICATED";
+ private static final String STATE_OVER_REPLICATED = "OVER_REPLICATED";
+ private static final Duration DEFAULT_AWAIT = Duration.ofSeconds(15);
+
+ @TempDir
+ private Path tempDir;
+
+ private ExportJobManager manager;
+ private ContainerHealthSchemaManager schema;
+
+ @BeforeEach
+ void setUp() {
+ schema = mock(ContainerHealthSchemaManager.class);
+ manager = newManager(/* maxQueueSize */ 4);
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (manager != null) {
+ manager.shutdown();
+ }
+ }
+
+ // ── Submit path ──────────────────────────────────────────────────────────
+
+ @Test
+ void submitCompletesAndProducesTar() throws Exception {
+ stubExport(STATE_MISSING, 5);
+
+ String jobId = manager.submitJob(STATE_MISSING);
+ awaitStatus(jobId, JobStatus.COMPLETED, DEFAULT_AWAIT);
+
+ ExportJob job = manager.getJob(jobId);
+ assertThat(job.getTotalRecords()).isEqualTo(5L);
+ assertThat(job.getEstimatedTotal()).isEqualTo(5L);
+ assertThat(new File(job.getFilePath())).exists().isFile();
+ assertThat(job.getFilePath()).endsWith(".tar");
+ }
+
+ @Test
+ void submitLargeExportSplitsIntoMultipleCsvParts() throws Exception {
+ // Crosses one million records, which should produce 3 CSV chunks:
+ // part001 (500k), part002 (500k), part003 (remaining).
+ final int recordCount = 1_000_001;
+ stubExport(STATE_MISSING, recordCount);
+
+ String jobId = manager.submitJob(STATE_MISSING);
+ awaitStatus(jobId, JobStatus.COMPLETED, Duration.ofMinutes(2));
+
+ ExportJob job = manager.getJob(jobId);
+ assertThat(job.getTotalRecords()).isEqualTo(recordCount);
+ assertThat(new File(job.getFilePath())).exists().isFile();
+
+ List<String> tarEntries = listTarEntryNames(new File(job.getFilePath()));
+ assertThat(tarEntries).hasSize(3);
+ assertThat(tarEntries).contains(
+ "unhealthy_containers_missing_part001.csv",
+ "unhealthy_containers_missing_part002.csv",
+ "unhealthy_containers_missing_part003.csv");
+ }
+
+ @Test
+ void submitEmptyResultStillReachesCompleted() throws Exception {
+ stubExport(STATE_MISSING, 0);
+
+ String jobId = manager.submitJob(STATE_MISSING);
+ awaitStatus(jobId, JobStatus.COMPLETED, DEFAULT_AWAIT);
+
+ ExportJob job = manager.getJob(jobId);
+ assertThat(job.getTotalRecords()).isZero();
+ assertThat(new File(job.getFilePath())).exists();
+ }
+
+ @Test
+ void submitDuplicateStateWhileRunningIsRejected() throws Exception {
+ CountDownLatch release = new CountDownLatch(1);
+ when(schema.getUnhealthyContainersCount(any(), anyInt(),
anyLong())).thenReturn(1L);
+ when(schema.getUnhealthyContainersCursor(any(), anyInt(), anyLong()))
+ .thenAnswer(inv -> blockingCursor(release));
+
+ String firstJobId = manager.submitJob(STATE_MISSING);
+ try {
+ awaitStatus(firstJobId, JobStatus.RUNNING, DEFAULT_AWAIT);
+
+ assertThatThrownBy(() -> manager.submitJob(STATE_MISSING))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("already exists");
+ } finally {
+ release.countDown();
+ awaitStatus(firstJobId, JobStatus.COMPLETED, DEFAULT_AWAIT);
+ }
+ }
+
+ @Test
+ void submitDuplicateStateWhileCompletedIsRejected() throws Exception {
+ stubExport(STATE_MISSING, 2);
+
+ String firstJobId = manager.submitJob(STATE_MISSING);
+ awaitStatus(firstJobId, JobStatus.COMPLETED, DEFAULT_AWAIT);
+
+ assertThatThrownBy(() -> manager.submitJob(STATE_MISSING))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("already exists");
+ }
+
+ @Test
+ void submitFailedStateDoesNotBlockRetry() throws Exception {
+ when(schema.getUnhealthyContainersCount(any(), anyInt(),
anyLong())).thenReturn(1L);
+ when(schema.getUnhealthyContainersCursor(any(), anyInt(), anyLong()))
+ .thenThrow(new RuntimeException("simulated DB failure"))
+ .thenAnswer(inv -> finiteCursor(1));
+
+ String firstJobId = manager.submitJob(STATE_MISSING);
+ awaitStatus(firstJobId, JobStatus.FAILED, DEFAULT_AWAIT);
+
+ String retryJobId = manager.submitJob(STATE_MISSING);
+ awaitStatus(retryJobId, JobStatus.COMPLETED, DEFAULT_AWAIT);
+ assertThat(retryJobId).isNotEqualTo(firstJobId);
+ }
+
+ @Test
+ void submitQueueFullThrows() throws Exception {
+ // Re-instantiate with the smallest possible queue (1) so we hit the
+ // limit with the fewest distinct states. Layout once first job is
+ // pulled by the worker:
+ // queue = [job2] (size 1, == maxQueueSize)
+ // submit job3 -> rejected
+ manager.shutdown();
+ manager = newManager(/* maxQueueSize */ 1);
+
+ CountDownLatch release = new CountDownLatch(1);
+ when(schema.getUnhealthyContainersCount(any(), anyInt(),
anyLong())).thenReturn(1L);
+ when(schema.getUnhealthyContainersCursor(any(), anyInt(), anyLong()))
+ .thenAnswer(inv -> blockingCursor(release));
+
+ String running = manager.submitJob(STATE_MISSING);
+ awaitStatus(running, JobStatus.RUNNING, DEFAULT_AWAIT);
+ String waiting = manager.submitJob(STATE_UNDER_REPLICATED);
+
+ try {
+ assertThatThrownBy(() -> manager.submitJob(STATE_OVER_REPLICATED))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("queue is full");
+ } finally {
+ release.countDown();
+ awaitStatus(running, JobStatus.COMPLETED, DEFAULT_AWAIT);
+ awaitStatus(waiting, JobStatus.COMPLETED, DEFAULT_AWAIT);
+ }
+ }
+
+ // ── Cancel path ──────────────────────────────────────────────────────────
+
+ @Test
+ void cancelRunningJobMarksFailedAndRemovesFromTracker() throws Exception {
+ CountDownLatch release = new CountDownLatch(1);
+ when(schema.getUnhealthyContainersCount(any(), anyInt(),
anyLong())).thenReturn(1L);
+ when(schema.getUnhealthyContainersCursor(any(), anyInt(), anyLong()))
+ .thenAnswer(inv -> blockingCursor(release));
+
+ String jobId = manager.submitJob(STATE_MISSING);
+ try {
+ awaitStatus(jobId, JobStatus.RUNNING, DEFAULT_AWAIT);
+
+ manager.cancelJob(jobId);
+
+ assertThat(manager.getJob(jobId)).isNull();
+ assertThat(new File(tempDir.toFile(), jobId)).doesNotExist();
+ } finally {
+ release.countDown();
+ }
+ }
+
+ @Test
+ void cancelCompletedJobDeletesTarFile() throws Exception {
+ stubExport(STATE_MISSING, 2);
+
+ String jobId = manager.submitJob(STATE_MISSING);
+ awaitStatus(jobId, JobStatus.COMPLETED, DEFAULT_AWAIT);
+
+ String tarPath = manager.getJob(jobId).getFilePath();
+ assertThat(new File(tarPath)).exists();
+
+ manager.cancelJob(jobId);
+
+ assertThat(manager.getJob(jobId)).isNull();
+ assertThat(new File(tarPath)).doesNotExist();
+ }
+
+ @Test
+ void cancelUnknownJobIdThrows() {
+ assertThatThrownBy(() -> manager.cancelJob("does-not-exist"))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("Job not found");
+ }
+
+ // ── Queue + listing ──────────────────────────────────────────────────────
+
+ @Test
+ void getQueuePositionAssignsSequentialPositions() throws Exception {
+ CountDownLatch release = new CountDownLatch(1);
+ when(schema.getUnhealthyContainersCount(any(), anyInt(),
anyLong())).thenReturn(1L);
+ when(schema.getUnhealthyContainersCursor(any(), anyInt(), anyLong()))
+ .thenAnswer(inv -> blockingCursor(release));
+
+ String running = manager.submitJob(STATE_MISSING);
+ awaitStatus(running, JobStatus.RUNNING, DEFAULT_AWAIT);
+
+ String waiting1 = manager.submitJob(STATE_UNDER_REPLICATED);
+ String waiting2 = manager.submitJob(STATE_OVER_REPLICATED);
+
+ try {
+ assertThat(manager.getQueuePosition(running)).isZero(); //
already running
+ assertThat(manager.getQueuePosition(waiting1)).isEqualTo(1);
+ assertThat(manager.getQueuePosition(waiting2)).isEqualTo(2);
+ } finally {
+ release.countDown();
+ awaitStatus(running, JobStatus.COMPLETED, DEFAULT_AWAIT);
+ awaitStatus(waiting1, JobStatus.COMPLETED, DEFAULT_AWAIT);
+ awaitStatus(waiting2, JobStatus.COMPLETED, DEFAULT_AWAIT);
+ }
+ }
+
+ @Test
+ void getAllJobsReturnsEveryTrackedJob() throws Exception {
+ stubExport(STATE_MISSING, 1);
+
+ String first = manager.submitJob(STATE_MISSING);
+ awaitStatus(first, JobStatus.COMPLETED, DEFAULT_AWAIT);
+
+ String second = manager.submitJob(STATE_UNDER_REPLICATED);
+ awaitStatus(second, JobStatus.COMPLETED, DEFAULT_AWAIT);
+
+ List<ExportJob> jobs = manager.getAllJobs();
+ assertThat(jobs).hasSize(2);
+
assertThat(jobs).extracting(ExportJob::getJobId).containsExactlyInAnyOrder(first,
second);
+ }
+
+ // ── Startup cleanup ──────────────────────────────────────────────────────
+
+ @Test
+ void startupCleanupRemovesLeftoverTarsAndDirs() throws Exception {
+ manager.shutdown();
+
+ File leftoverTar = new File(tempDir.toFile(), "stale_export.tar");
+ Files.write(leftoverTar.toPath(), new byte[]{1, 2, 3});
+
+ File leftoverDir = new File(tempDir.toFile(), "stale_job_id");
+ Files.createDirectories(leftoverDir.toPath());
+ Files.write(new File(leftoverDir, "part1.csv").toPath(), new byte[]{4, 5});
+
+ File unrelated = new File(tempDir.toFile(), "keep.txt");
+ Files.write(unrelated.toPath(), new byte[]{6});
+
+ manager = newManager(/* maxQueueSize */ 4);
+
+ assertThat(leftoverTar).doesNotExist();
+ assertThat(leftoverDir).doesNotExist();
+ assertThat(unrelated).exists();
+ }
+
+ // ── Filename derivation ─────────────────────────────────────────────────
+
+ @Test
+ void submitFileNameMatchesExpectedPattern() throws Exception {
+ stubExport(STATE_MISSING, 1);
+
+ String jobId = manager.submitJob(STATE_MISSING);
+ awaitStatus(jobId, JobStatus.COMPLETED, DEFAULT_AWAIT);
+
+ ExportJob job = manager.getJob(jobId);
+ assertThat(job.getFileName()).matches("^export_missing_\\d+\\.tar$");
+ }
+
+ // ── Helpers ──────────────────────────────────────────────────────────────
+
+ private ExportJobManager newManager(int maxQueueSize) {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set(OZONE_RECON_EXPORT_DIRECTORY, tempDir.toString());
+ conf.setInt(OZONE_RECON_EXPORT_MAX_JOBS_TOTAL, maxQueueSize);
+ conf.setInt(OZONE_RECON_EXPORT_MAX_DOWNLOADS, 3);
+ return new ExportJobManager(schema, conf);
+ }
+
+ /**
+ * Configures the schema mock so any export returns a finite cursor of
+ * {@code recordCount} fake rows. Validates {@code state} eagerly so a bad
+ * test fixture fails fast rather than from inside the worker thread.
+ */
+ private void stubExport(String state, int recordCount) {
+ UnHealthyContainerStates.valueOf(state);
+ when(schema.getUnhealthyContainersCount(any(), anyInt(), anyLong()))
+ .thenReturn((long) recordCount);
+ when(schema.getUnhealthyContainersCursor(any(), anyInt(), anyLong()))
+ .thenAnswer(inv -> finiteCursor(recordCount));
+ }
+
+ /**
+ * Returns a fresh mock cursor that yields {@code recordCount} fake records.
+ */
+ @SuppressWarnings("unchecked")
+ private static Cursor<UnhealthyContainersRecord> finiteCursor(int
recordCount) {
+ Cursor<UnhealthyContainersRecord> cursor = mock(Cursor.class);
+ AtomicInteger pos = new AtomicInteger(0);
+ when(cursor.hasNext()).thenAnswer(inv -> pos.get() < recordCount);
+ when(cursor.fetchNext()).thenAnswer(inv -> {
+ int i = pos.getAndIncrement();
+ UnhealthyContainersRecord rec = new UnhealthyContainersRecord();
+ rec.setContainerId((long) (i + 1));
+ rec.setContainerState("MISSING");
+ rec.setInStateSince(0L);
+ rec.setExpectedReplicaCount(3);
+ rec.setActualReplicaCount(2);
+ rec.setReplicaDelta(1);
+ return rec;
+ });
+ return cursor;
+ }
+
+ /**
+ * Returns a fresh mock cursor whose {@code hasNext()} blocks until the
+ * provided latch is counted down. Used to keep a running job suspended
+ * mid-execution so the test can observe RUNNING status. If the worker
+ * thread is interrupted (e.g. by {@code cancelJob}) we propagate via an
+ * unchecked exception so the manager's outer {@code catch (Exception)}
+ * runs the failure / cleanup path.
+ */
+ @SuppressWarnings("unchecked")
+ private static Cursor<UnhealthyContainersRecord>
blockingCursor(CountDownLatch release) {
+ Cursor<UnhealthyContainersRecord> cursor = mock(Cursor.class);
+ when(cursor.hasNext()).thenAnswer(inv -> {
+ try {
+ release.await();
+ return false;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("blockingCursor interrupted", e);
+ }
+ });
+ return cursor;
+ }
+
+ private static List<String> listTarEntryNames(File tarFile) throws Exception
{
+ List<String> entries = new ArrayList<>();
+ try (TarArchiveInputStream tis =
+ new
TarArchiveInputStream(Files.newInputStream(tarFile.toPath()))) {
+ TarArchiveEntry entry;
+ while ((entry = tis.getNextTarEntry()) != null) {
+ if (!entry.isDirectory()) {
+ entries.add(entry.getName());
+ }
+ }
+ }
+ return entries;
+ }
+
+ private void awaitStatus(String jobId, JobStatus target, Duration timeout)
throws Exception {
+ long deadlineNanos = System.nanoTime() + timeout.toNanos();
+ while (System.nanoTime() < deadlineNanos) {
+ ExportJob job = manager.getJob(jobId);
+ if (job != null && job.getStatus() == target) {
+ return;
+ }
+ TimeUnit.MILLISECONDS.sleep(50);
+ }
+ ExportJob latest = manager.getJob(jobId);
+ throw new AssertionError("Timed out waiting for job " + jobId + " to reach
" + target
+ + "; last observed=" + (latest == null ? "<absent>" :
latest.getStatus()));
+ }
+}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/types/TestExportJob.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/types/TestExportJob.java
new file mode 100644
index 00000000000..c3069c5eced
--- /dev/null
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/types/TestExportJob.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.recon.api.types;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.jupiter.api.Test;
+
+/**
+ * Unit tests for the {@link ExportJob} POJO.
+ *
+ * Focuses on the small piece of business logic baked into the model:
+ * the per-job download counter and the file path -> file name derivation.
+ */
+class TestExportJob {
+
+ @Test
+ void downloadAllowedInitiallyTrue() {
+ ExportJob job = new ExportJob("job-1", "MISSING", 3);
+
+ assertThat(job.isDownloadAllowed()).isTrue();
+ assertThat(job.getDownloadCount()).isZero();
+ assertThat(job.getMaxDownloads()).isEqualTo(3);
+ assertThat(job.getDownloadsRemaining()).isEqualTo(3);
+ }
+
+ @Test
+ void tryReserveDownloadDecrementsRemaining() {
+ ExportJob job = new ExportJob("job-1", "MISSING", 3);
+
+ assertThat(job.tryReserveDownload()).isTrue();
+ assertThat(job.getDownloadCount()).isEqualTo(1);
+ assertThat(job.getDownloadsRemaining()).isEqualTo(2);
+ assertThat(job.isDownloadAllowed()).isTrue();
+ }
+
+ @Test
+ void downloadAllowedFalseAtLimit() {
+ ExportJob job = new ExportJob("job-1", "MISSING", 3);
+
+ assertThat(job.tryReserveDownload()).isTrue();
+ assertThat(job.tryReserveDownload()).isTrue();
+ assertThat(job.tryReserveDownload()).isTrue();
+
+ assertThat(job.isDownloadAllowed()).isFalse();
+ assertThat(job.getDownloadsRemaining()).isZero();
+ assertThat(job.tryReserveDownload()).isFalse();
+ }
+
+ @Test
+ void downloadsRemainingNeverNegative() {
+ ExportJob job = new ExportJob("job-1", "MISSING", 1);
+
+ assertThat(job.tryReserveDownload()).isTrue();
+ assertThat(job.tryReserveDownload()).isFalse();
+ assertThat(job.tryReserveDownload()).isFalse();
+
+ assertThat(job.getDownloadsRemaining()).isZero();
+ assertThat(job.isDownloadAllowed()).isFalse();
+ assertThat(job.getDownloadCount()).isEqualTo(1);
+ }
+
+ @Test
+ void setFilePathDerivesFileName() {
+ ExportJob job = new ExportJob("job-1", "MISSING", 3);
+
+ job.setFilePath("export_missing_1736000000000.tar");
+
+ assertThat(job.getFilePath())
+ .isEqualTo("export_missing_1736000000000.tar");
+
assertThat(job.getFileName()).isEqualTo("export_missing_1736000000000.tar");
+ }
+
+ @Test
+ void setFilePathNullClearsFileName() {
+ ExportJob job = new ExportJob("job-1", "MISSING", 3);
+ job.setFilePath("export_missing_1.tar");
+ assertThat(job.getFileName()).isEqualTo("export_missing_1.tar");
+
+ job.setFilePath(null);
+
+ assertThat(job.getFilePath()).isNull();
+ assertThat(job.getFileName()).isNull();
+ }
+
+ @Test
+ void initialStatusIsQueued() {
+ ExportJob job = new ExportJob("job-1", "MISSING", 3);
+
+ assertThat(job.getStatus()).isEqualTo(ExportJob.JobStatus.QUEUED);
+ assertThat(job.getSubmittedAt()).isPositive();
+ assertThat(job.getEstimatedTotal()).isEqualTo(-1);
+ assertThat(job.getTotalRecords()).isZero();
+ }
+}
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestReconReplicationManager.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestReconReplicationManager.java
index 64867279169..0678caa86eb 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestReconReplicationManager.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/fsck/TestReconReplicationManager.java
@@ -88,7 +88,7 @@ public TestReconReplicationManager() {
public void setUp() throws Exception {
dao = getDao(UnhealthyContainersDao.class);
schemaManagerV2 = new ContainerHealthSchemaManager(
- getSchemaDefinition(ContainerSchemaDefinition.class));
+ getSchemaDefinition(ContainerSchemaDefinition.class), new
OzoneConfiguration());
containerManager = mock(ContainerManager.class);
PlacementPolicy placementPolicy = mock(PlacementPolicy.class);
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestUnhealthyContainersDerbyPerformance.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestUnhealthyContainersDerbyPerformance.java
index 5cc90e88409..cb1094dea53 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestUnhealthyContainersDerbyPerformance.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/persistence/TestUnhealthyContainersDerbyPerformance.java
@@ -35,6 +35,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import
org.apache.hadoop.ozone.recon.ReconControllerModule.ReconDaoBindingModule;
import org.apache.hadoop.ozone.recon.ReconSchemaManager;
import
org.apache.hadoop.ozone.recon.persistence.AbstractReconSqlDBTest.DerbyDataSourceConfigurationProvider;
@@ -273,7 +274,7 @@ protected void configure() {
dao = injector.getInstance(UnhealthyContainersDao.class);
schemaDefinition = injector.getInstance(ContainerSchemaDefinition.class);
- schemaManager = new ContainerHealthSchemaManager(schemaDefinition);
+ schemaManager = new ContainerHealthSchemaManager(schemaDefinition, new
OzoneConfiguration());
}
// -----------------------------------------------------------------------
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]