This is an automated email from the ASF dual-hosted git repository.

frankgh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-sidecar.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8c44dcab2 CASSSIDECAR-409: Add safety check to Live Migration data 
copy endpoint (#321)
8c44dcab2 is described below

commit 8c44dcab2440ed3972e1b0f74cc3a2457efc7316
Author: N V Harikrishna <[email protected]>
AuthorDate: Fri Mar 27 04:02:28 2026 +0530

    CASSSIDECAR-409: Add safety check to Live Migration data copy endpoint 
(#321)
    
    Patch by N V Harikrishna; reviewed by Francisco Guerrero, Yifan Cai for 
CASSSIDECAR-409
---
 CHANGES.txt                                        |   1 +
 .../request/LiveMigrationDataCopyRequest.java      |   2 +-
 .../sidecar/routes/RepairIntegrationTest.java      |   2 +-
 .../LiveMigrationCreateDataCopyTaskHandler.java    |   2 +-
 .../sidecar/livemigration/DataCopyTaskManager.java |  93 ++++++++++++------
 .../LiveMigrationFileDownloadPreCheck.java         | 105 +++++++++++++++++++++
 .../livemigration/LiveMigrationFileDownloader.java |  73 +++++++++++++-
 .../LiveMigrationTaskFactoryImpl.java              |   7 +-
 .../livemigration/LiveMigrationTaskImpl.java       |   5 +-
 .../sidecar/modules/LiveMigrationModule.java       |   2 +
 .../cassandra/sidecar/HelperTestModules.java       |   1 +
 .../livemigration/DataCopyTaskManagerTest.java     |  75 ++++++++++++++-
 .../LiveMigrationFileDownloaderTest.java           |  96 +++++++++++++++++++
 .../livemigration/LiveMigrationTaskImplTest.java   |   2 +-
 14 files changed, 430 insertions(+), 36 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 538baacb3..3b382f539 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 0.4.0
 -----
+ * Add safety check to Live Migration data copy task endpoint (CASSSIDECAR-409)
  * Define common operational job tracking interface and refactor current 
operational job tracker (CASSSIDECAR-372)
 
 0.3.0
diff --git 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequest.java
 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequest.java
index ebb447d69..389fa3941 100644
--- 
a/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequest.java
+++ 
b/client-common/src/main/java/org/apache/cassandra/sidecar/common/request/LiveMigrationDataCopyRequest.java
@@ -45,7 +45,7 @@ public class LiveMigrationDataCopyRequest
     public final int maxConcurrency;
 
     /**
-     * Creates a new request with auto-generated ID.
+     * Creates a new live migration data copy request.
      */
     @JsonCreator
     public LiveMigrationDataCopyRequest(@JsonProperty("maxIterations") int 
maxIterations,
diff --git 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RepairIntegrationTest.java
 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RepairIntegrationTest.java
index 7c6b1776f..71495a414 100644
--- 
a/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RepairIntegrationTest.java
+++ 
b/integration-tests/src/integrationTest/org/apache/cassandra/sidecar/routes/RepairIntegrationTest.java
@@ -214,7 +214,7 @@ class RepairIntegrationTest extends 
SharedClusterSidecarIntegrationTestBase
     private void pollStatusForState(String uuid)
     {
         String status = "/api/v1/cassandra/operational-jobs/" + uuid;
-        loopAssert(30, 500, () -> {
+        loopAssert(60, 500, () -> {
             HttpResponse<Buffer> resp = 
getBlocking(trustedClient().get(serverWrapper.serverPort, "localhost", status)
                                                                    .send());
             logger.info("Success Status Response code: {}", resp.statusCode());
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java
index b6875d982..14fe60b6e 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/handlers/livemigration/LiveMigrationCreateDataCopyTaskHandler.java
@@ -103,7 +103,7 @@ public class LiveMigrationCreateDataCopyTaskHandler extends 
AbstractHandler<Live
         .onFailure(throwable -> {
             if (throwable instanceof LiveMigrationInvalidRequestException)
             {
-                LOGGER.error("Input payload is not valid.", throwable);
+                LOGGER.error("Invalid live migration request.", throwable);
                 context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, 
throwable.getMessage(), throwable));
             }
             else if (throwable instanceof 
LiveMigrationDataCopyInProgressException)
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java
index 719b93d35..faf87080b 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManager.java
@@ -31,10 +31,12 @@ import com.datastax.driver.core.utils.UUIDs;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import io.vertx.core.Future;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import 
org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
 import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationDataCopyInProgressException;
 import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException;
 import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException;
@@ -101,39 +103,76 @@ public class DataCopyTaskManager
                                                  String source,
                                                  InstanceMetadata 
localInstanceMetadata)
     {
-        LiveMigrationTask newTask = createTask(request,
-                                               source,
-                                               
sidecarConfiguration.serviceConfiguration().port(),
-                                               localInstanceMetadata);
-
-        // It is possible to serve only one live migration data copy request 
per instance at a time.
-        // Checking if there is another migration is in progress before 
accepting new one.
-        boolean accepted = currentTasks.compute(localInstanceMetadata.id(), 
(integer, taskInMap) -> {
-            if (taskInMap == null)
-            {
-                return newTask;
-            }
+        // Fast local JMX check before creating task - prevents task creation 
if Cassandra is running
+        return verifyCassandraNotRunning(localInstanceMetadata)
+               .compose(v -> {
+                   LiveMigrationTask newTask = createTask(request,
+                                                          source,
+                                                          
sidecarConfiguration.serviceConfiguration().port(),
+                                                          
localInstanceMetadata);
+
+                   // It is possible to serve only one live migration data 
copy request per instance at a time.
+                   // Checking if there is another migration is in progress 
before accepting new one.
+                   boolean accepted = newTask == 
currentTasks.compute(localInstanceMetadata.id(), (integer, taskInMap) -> {
+                       if (taskInMap == null)
+                       {
+                           return newTask;
+                       }
+
+                       // Accept new task if and only if the existing task has 
completed.
+                       return taskInMap.isCompleted() ? newTask : taskInMap;
+                   });
+
+                   if (!accepted)
+                   {
+                       return Future.failedFuture(
+                       new LiveMigrationDataCopyInProgressException("Another 
task is already under progress. Cannot accept new task."));
+                   }
+                   LOGGER.info("Starting data copy task with id={}, source={}, 
destination={}",
+                               newTask.id(), source, 
localInstanceMetadata.host());
+                   newTask.start();
+                   return Future.succeededFuture(newTask);
+               });
+    }
 
-            if (!taskInMap.isCompleted())
-            {
-                // Accept new task if and only if the existing task has 
completed.
-                return taskInMap;
-            }
-            else
+    /**
+     * Initiating data copy once a Cassandra instance starts is not 
acceptable. This method checks whether
+     * Cassandra is running or not at the moment on the destination instance 
by checking if Sidecar
+     * is able to connect to the Cassandra instance's JMX port or native (CQL) 
port. It returns a failed
+     * future if Sidecar is able to connect to either port of Cassandra.
+     *
+     * @param localInstance metadata for the local Cassandra instance
+     * @return Future that succeeds if Cassandra is not running, fails if it 
is running
+     */
+    private Future<Void> verifyCassandraNotRunning(InstanceMetadata 
localInstance)
+    {
+        try
+        {
+            CassandraAdapterDelegate delegate = localInstance.delegate();
+
+            if (delegate.isJmxUp() || delegate.isNativeUp())
             {
-                return newTask;
+                return Future.failedFuture(new 
LiveMigrationInvalidRequestException(
+                "Cannot start data copy: Cassandra is currently running on 
this instance " +
+                "(JMX or native connectivity established). Data copy cannot 
proceed while Cassandra is active."));
             }
-        }) == newTask;
 
-        if (!accepted)
+            // JMX and native are down - Cassandra is not running (or at least 
wasn't during last health check)
+            LOGGER.debug("Local JMX and native check passed: Cassandra not 
detected as running on {}", localInstance.host());
+            return Future.succeededFuture();
+        }
+        catch (CassandraUnavailableException e)
         {
-            return Future.failedFuture(
-            new LiveMigrationDataCopyInProgressException("Another task is 
already under progress. Cannot accept new task."));
+            // No delegate available - Cassandra is not running
+            LOGGER.debug("No Cassandra delegate available for {} (Cassandra 
not running)", localInstance.host());
+            return Future.succeededFuture();
+        }
+        catch (Exception e)
+        {
+            // Unexpected error - be conservative and reject for safety
+            LOGGER.warn("Unable to verify Cassandra status on {}, rejecting 
for safety", localInstance.host(), e);
+            return Future.failedFuture(e);
         }
-        LOGGER.info("Starting data copy task with id={}, source={}, 
destination={}",
-                    newTask.id(), source, localInstanceMetadata.host());
-        newTask.start();
-        return Future.succeededFuture(newTask);
     }
 
     LiveMigrationTask createTask(LiveMigrationDataCopyRequest request,
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloadPreCheck.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloadPreCheck.java
new file mode 100644
index 000000000..71abb8a8e
--- /dev/null
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloadPreCheck.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.livemigration;
+
+import io.vertx.core.Future;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import 
org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest;
+
+/**
+ * A pluggable pre-check hook that runs before each file download iteration in 
the live migration
+ * data copy process. Since the data copy involves deleting local files and 
downloading files from
+ * the source, implementations can perform safety validations (e.g., verifying 
cluster state via gossip,
+ * checking instance readiness) to prevent data corruption or unsafe 
operations.
+ *
+ * <p>The pre-check is invoked at the beginning of every download iteration 
(not just the first one),
+ * allowing implementations to continuously validate that conditions remain 
safe throughout the
+ * multi-iteration copy process.</p>
+ *
+ * <p>A {@link #DEFAULT} no-op implementation is provided for cases where no 
pre-check is needed.
+ * Custom implementations can be bound via Guice to override the default 
behavior.</p>
+ *
+ * <p>Example use cases:</p>
+ * <ul>
+ *   <li>Gossip-based validation: verify the source node is present and the 
destination node
+ *       is absent from cluster gossip, preventing data copy to a node that 
has already joined</li>
+ *   <li>Instance state checks: verify the local Cassandra process is not 
running</li>
+ *   <li>Disk space validation: ensure sufficient space before downloading</li>
+ * </ul>
+ *
+ * @see LiveMigrationFileDownloader#downloadFiles()
+ */
+public interface LiveMigrationFileDownloadPreCheck
+{
+    /**
+     * Default no-op implementation that always succeeds. Used when no 
pre-check validation is required.
+     */
+    LiveMigrationFileDownloadPreCheck DEFAULT = context -> 
Future.succeededFuture();
+
+    /**
+     * Performs a safety check before a file download iteration begins.
+     *
+     * <p>Implementations should return a succeeded future if the check passes 
(it is safe to proceed),
+     * or a failed future with a descriptive exception if the check fails 
(download should be aborted).</p>
+     *
+     * @param context provides access to the source host, destination instance 
metadata,
+     *                sidecar port, and the data copy request parameters 
needed for validation
+     * @return a succeeded {@link Future} if the pre-check passes, a failed 
{@link Future} otherwise
+     */
+    Future<Void> doCheck(PreCheckContext context);
+
+    /**
+     * Encapsulates the contextual information available to a {@link 
LiveMigrationFileDownloadPreCheck}
+     * implementation. This context is populated by the {@link 
LiveMigrationFileDownloader} before each
+     * download iteration.
+     *
+     * <p>Provides access to:</p>
+     * <ul>
+     *   <li>{@link #source()} - hostname of the source instance being copied 
from</li>
+     *   <li>{@link #destinationInstanceMetadata()} - metadata of the 
local/destination instance,
+     *       including host, data directories, and the Cassandra adapter 
delegate</li>
+     *   <li>{@link #sidecarPort()} - port on which Sidecar is running, useful 
for making
+     *       Sidecar API calls to other cluster instances</li>
+     *   <li>{@link #request()} - the original data copy request containing 
task parameters</li>
+     * </ul>
+     */
+    interface PreCheckContext
+    {
+        /**
+         * @return the hostname of the source instance from which files are 
being copied
+         */
+        String source();
+
+        /**
+         * @return the metadata for the destination (local) instance where 
files will be written
+         */
+        InstanceMetadata destinationInstanceMetadata();
+
+        /**
+         * @return the Sidecar service port, useful for contacting other 
Sidecar instances in the cluster
+         */
+        int sidecarPort();
+
+        /**
+         * @return the original data copy request containing task parameters 
such as max iterations,
+         *         success threshold, and max concurrency
+         */
+        LiveMigrationDataCopyRequest request();
+    }
+}
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java
index 4f918808f..5e0142d98 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloader.java
@@ -81,6 +81,7 @@ class LiveMigrationFileDownloader
     private final int port;
     private final String logPrefix;
     private final ExecutorPools executorPools;
+    private final LiveMigrationFileDownloadPreCheck preCheck;
     private OperationStatus operationStatus;
     private AsyncConcurrentTaskExecutor<Void> concurrentTaskExecutor;
 
@@ -97,6 +98,7 @@ class LiveMigrationFileDownloader
         this.source = builder.source;
         this.port = builder.port;
         this.executorPools = builder.executorPools;
+        this.preCheck = builder.preCheck;
 
         this.operationStatus = OperationStatus.startingState();
         this.logPrefix = String.format("liveMigrationRequest=%s iteration=%s 
", id, iteration);
@@ -115,7 +117,8 @@ class LiveMigrationFileDownloader
      */
     public Future<OperationStatus> downloadFiles()
     {
-        return checkLiveMigrationStatusOfSource()
+        return runPreCheck()
+               .compose(v -> checkLiveMigrationStatusOfSource())
                .compose(v -> fetchSourceFileList())
                .compose(this::cleanupUnnecessaryFiles)
                .compose(this::prepareDownloadList)
@@ -124,6 +127,14 @@ class LiveMigrationFileDownloader
                .otherwise(this::handleDownloadFailure);
     }
 
+    private Future<Void> runPreCheck()
+    {
+        LiveMigrationFileDownloadPreCheck.PreCheckContext context
+        = new PreCheckContextImpl(source, instanceMetadata, port, request);
+        return preCheck.doCheck(context)
+               .onSuccess(v -> LOGGER.debug("{} Pre-check completed 
successfully. Proceeding with data copy.", logPrefix))
+               .onFailure(throwable -> LOGGER.error("{} Pre-check failed.", 
logPrefix, throwable));
+    }
 
     /**
      * Checks whether the live migration status at the source is NOT_COMPLETED 
or COMPLETED.
@@ -572,6 +583,7 @@ class LiveMigrationFileDownloader
         private String id;
         private String source;
         private int port;
+        private LiveMigrationFileDownloadPreCheck preCheck;
 
         protected Builder()
         {
@@ -704,6 +716,17 @@ class LiveMigrationFileDownloader
             return update(b -> b.executorPools = executorPools);
         }
 
+        /**
+         * Sets the {@code preCheck} instance and return a reference to this 
Builder enabling method chaining.
+         *
+         * @param preCheck the {@code preCheck} to set
+         * @return a reference to this Builder
+         */
+        public Builder preCheck(LiveMigrationFileDownloadPreCheck preCheck)
+        {
+            return update(b -> b.preCheck = preCheck);
+        }
+
         /**
          * Returns a {@code LiveMigrationFileDownloader} built from the 
parameters previously set.
          *
@@ -721,6 +744,7 @@ class LiveMigrationFileDownloader
             Objects.requireNonNull(request);
             Objects.requireNonNull(source);
             Objects.requireNonNull(executorPools);
+            Objects.requireNonNull(preCheck);
 
             return new LiveMigrationFileDownloader(this);
         }
@@ -744,4 +768,51 @@ class LiveMigrationFileDownloader
             this.lastModifiedTime = lastModifiedTime;
         }
     }
+
+    /**
+     * Implementation of {@link 
LiveMigrationFileDownloadPreCheck.PreCheckContext} that provides
+     * the downloader's context to pre-check implementations.
+     */
+    private static class PreCheckContextImpl implements 
LiveMigrationFileDownloadPreCheck.PreCheckContext
+    {
+        private final String source;
+        private final InstanceMetadata destinationInstanceMetadata;
+        private final int sidecarPort;
+        private final LiveMigrationDataCopyRequest request;
+
+        PreCheckContextImpl(String source,
+                            InstanceMetadata destinationInstanceMetadata,
+                            int sidecarPort,
+                            LiveMigrationDataCopyRequest request)
+        {
+            this.source = source;
+            this.destinationInstanceMetadata = destinationInstanceMetadata;
+            this.sidecarPort = sidecarPort;
+            this.request = request;
+        }
+
+        @Override
+        public String source()
+        {
+            return source;
+        }
+
+        @Override
+        public InstanceMetadata destinationInstanceMetadata()
+        {
+            return destinationInstanceMetadata;
+        }
+
+        @Override
+        public int sidecarPort()
+        {
+            return sidecarPort;
+        }
+
+        @Override
+        public LiveMigrationDataCopyRequest request()
+        {
+            return request;
+        }
+    }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskFactoryImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskFactoryImpl.java
index bc98c2cc9..4c254f20c 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskFactoryImpl.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskFactoryImpl.java
@@ -39,17 +39,20 @@ public class LiveMigrationTaskFactoryImpl implements 
LiveMigrationTaskFactory
     private final SidecarClientProvider sidecarClientProvider;
     private final LiveMigrationConfiguration liveMigrationConfiguration;
     private final ExecutorPools executorPools;
+    private final LiveMigrationFileDownloadPreCheck preCheck;
 
     @Inject
     public LiveMigrationTaskFactoryImpl(Vertx vertx,
                                         ExecutorPools executorPools,
                                         SidecarClientProvider 
sidecarClientProvider,
-                                        SidecarConfiguration 
sidecarConfiguration)
+                                        SidecarConfiguration 
sidecarConfiguration,
+                                        LiveMigrationFileDownloadPreCheck 
preCheck)
     {
         this.vertx = vertx;
         this.executorPools = executorPools;
         this.sidecarClientProvider = sidecarClientProvider;
         this.liveMigrationConfiguration = 
sidecarConfiguration.liveMigrationConfiguration();
+        this.preCheck = preCheck;
     }
 
     /**
@@ -63,6 +66,6 @@ public class LiveMigrationTaskFactoryImpl implements 
LiveMigrationTaskFactory
                                     InstanceMetadata instanceMetadata)
     {
         return new LiveMigrationTaskImpl(vertx, executorPools, 
sidecarClientProvider, liveMigrationConfiguration,
-                                         id, request, source, port, 
instanceMetadata);
+                                         id, request, source, port, 
instanceMetadata, preCheck);
     }
 }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImpl.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImpl.java
index 723f35de7..c64b14be4 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImpl.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImpl.java
@@ -52,6 +52,7 @@ public class LiveMigrationTaskImpl implements 
LiveMigrationTask
     private final ExecutorPools executorPools;
     private final SidecarClientProvider sidecarClientProvider;
     private final LiveMigrationConfiguration liveMigrationConfiguration;
+    private final LiveMigrationFileDownloadPreCheck preCheck;
 
     // Indicates overall status of the operation (succeeded or failed).
     // Future returned by downloader changes on next iteration. Using a 
separate future to track overall operation.
@@ -69,7 +70,7 @@ public class LiveMigrationTaskImpl implements 
LiveMigrationTask
                                  LiveMigrationDataCopyRequest request,
                                  String source,
                                  int port,
-                                 InstanceMetadata instanceMetadata)
+                                 InstanceMetadata instanceMetadata, 
LiveMigrationFileDownloadPreCheck preCheck)
     {
         this.vertx = vertx;
         this.executorPools = executorPools;
@@ -80,6 +81,7 @@ public class LiveMigrationTaskImpl implements 
LiveMigrationTask
         this.instanceMetadata = instanceMetadata;
         this.source = source;
         this.port = port;
+        this.preCheck = preCheck;
     }
 
     /**
@@ -130,6 +132,7 @@ public class LiveMigrationTaskImpl implements 
LiveMigrationTask
                                                 .source(source)
                                                 .port(port)
                                                 .executorPools(executorPools)
+                                                .preCheck(preCheck)
                                                 .build();
         return downloader.downloadFiles();
     }
diff --git 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java
 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java
index 9cb95c836..4c10c5180 100644
--- 
a/server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java
+++ 
b/server/src/main/java/org/apache/cassandra/sidecar/modules/LiveMigrationModule.java
@@ -42,6 +42,7 @@ import 
org.apache.cassandra.sidecar.handlers.livemigration.LiveMigrationMapSidec
 import 
org.apache.cassandra.sidecar.handlers.livemigration.LiveMigrationStatusClearHandler;
 import 
org.apache.cassandra.sidecar.handlers.livemigration.LiveMigrationStatusCompleteHandler;
 import 
org.apache.cassandra.sidecar.handlers.livemigration.LiveMigrationStatusGetHandler;
+import 
org.apache.cassandra.sidecar.livemigration.LiveMigrationFileDownloadPreCheck;
 import org.apache.cassandra.sidecar.livemigration.LiveMigrationStatusTracker;
 import 
org.apache.cassandra.sidecar.livemigration.LiveMigrationStatusTrackerImpl;
 import org.apache.cassandra.sidecar.livemigration.LiveMigrationTaskFactory;
@@ -69,6 +70,7 @@ public class LiveMigrationModule extends AbstractModule
         
bind(LiveMigrationMap.class).to(LiveMigrationMapSidecarConfigImpl.class);
         
bind(LiveMigrationTaskFactory.class).to(LiveMigrationTaskFactoryImpl.class);
         
bind(LiveMigrationStatusTracker.class).to(LiveMigrationStatusTrackerImpl.class);
+        
bind(LiveMigrationFileDownloadPreCheck.class).toInstance(LiveMigrationFileDownloadPreCheck.DEFAULT);
     }
 
     @GET
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/HelperTestModules.java 
b/server/src/test/java/org/apache/cassandra/sidecar/HelperTestModules.java
index eb671308b..87674d82e 100644
--- a/server/src/test/java/org/apache/cassandra/sidecar/HelperTestModules.java
+++ b/server/src/test/java/org/apache/cassandra/sidecar/HelperTestModules.java
@@ -80,6 +80,7 @@ public class HelperTestModules
             this.instanceMetadataList = instanceMetadataList;
         }
 
+        @Override
         protected void configure()
         {
             InstancesMetadata mockInstancesMetadata = 
mock(InstancesMetadata.class);
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManagerTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManagerTest.java
index 996663753..52bafd6e8 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManagerTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/DataCopyTaskManagerTest.java
@@ -33,6 +33,7 @@ import com.google.inject.Guice;
 import com.google.inject.Injector;
 import io.vertx.core.Future;
 import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.cluster.InstancesMetadata;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import 
org.apache.cassandra.sidecar.common.request.LiveMigrationDataCopyRequest;
@@ -40,6 +41,7 @@ import 
org.apache.cassandra.sidecar.common.response.LiveMigrationTaskResponse;
 import org.apache.cassandra.sidecar.config.LiveMigrationConfiguration;
 import org.apache.cassandra.sidecar.config.ServiceConfiguration;
 import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException;
 import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationDataCopyInProgressException;
 import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationInvalidRequestException;
 import 
org.apache.cassandra.sidecar.exceptions.LiveMigrationExceptions.LiveMigrationTaskNotFoundException;
@@ -47,6 +49,7 @@ import 
org.apache.cassandra.sidecar.handlers.livemigration.FakeLiveMigrationTask
 import org.apache.cassandra.sidecar.handlers.livemigration.LiveMigrationMap;
 import org.jetbrains.annotations.NotNull;
 
+import static 
org.apache.cassandra.sidecar.exceptions.CassandraUnavailableException.Service.CQL_AND_JMX;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
@@ -161,6 +164,72 @@ public class DataCopyTaskManagerTest
         assertThat(future.result().id()).isNotEqualTo("completed-task");
     }
 
+    @Test
+    public void testCreateTaskShouldFailWhenCassandraInstanceJMXIsUp() throws 
InterruptedException
+    {
+        Injector injector = getInjector();
+        DataCopyTaskManager dataCopyTaskManager = 
getDataCopyTaskManager(injector);
+        InstancesMetadata instancesMetadata = 
injector.getInstance(InstancesMetadata.class);
+        InstanceMetadata destinationMetadata = 
instancesMetadata.instanceFromHost(dest1Name);
+
+        // Mocking JMX as up
+        when(destinationMetadata.delegate().isJmxUp()).thenReturn(true);
+
+        LiveMigrationDataCopyRequest request = new 
LiveMigrationDataCopyRequest(1, 1.0, 2);
+        Future<LiveMigrationTask> future = 
dataCopyTaskManager.createTask(request, dest1Name);
+        awaitForFuture(future);
+
+        assertThat(future.succeeded()).isFalse();
+        assertThat(future.failed()).isTrue();
+        assertThat(future.result()).isNull();
+        assertThat(future.cause()).isNotNull();
+        
assertThat(future.cause()).isInstanceOf(LiveMigrationInvalidRequestException.class);
+    }
+
+    @Test
+    public void testCreateTaskShouldFailWhenCassandraInstanceNativeIsUp() 
throws InterruptedException
+    {
+        Injector injector = getInjector();
+        DataCopyTaskManager dataCopyTaskManager = 
getDataCopyTaskManager(injector);
+        InstancesMetadata instancesMetadata = 
injector.getInstance(InstancesMetadata.class);
+        InstanceMetadata destinationMetadata = 
instancesMetadata.instanceFromHost(dest1Name);
+
+        // Mocking native (CQL) as up but JMX as down
+        when(destinationMetadata.delegate().isJmxUp()).thenReturn(false);
+        when(destinationMetadata.delegate().isNativeUp()).thenReturn(true);
+
+        LiveMigrationDataCopyRequest request = new 
LiveMigrationDataCopyRequest(1, 1.0, 2);
+        Future<LiveMigrationTask> future = 
dataCopyTaskManager.createTask(request, dest1Name);
+        awaitForFuture(future);
+
+        assertThat(future.succeeded()).isFalse();
+        assertThat(future.failed()).isTrue();
+        assertThat(future.result()).isNull();
+        assertThat(future.cause()).isNotNull();
+        
assertThat(future.cause()).isInstanceOf(LiveMigrationInvalidRequestException.class);
+    }
+
+    @Test
+    public void 
testCreateTaskShouldSucceedWhenCassandraAdapterIsNotAvailable() throws 
InterruptedException
+    {
+        Injector injector = getInjector();
+        DataCopyTaskManager dataCopyTaskManager = 
getDataCopyTaskManager(injector);
+        InstancesMetadata instancesMetadata = 
injector.getInstance(InstancesMetadata.class);
+        InstanceMetadata destinationMetadata = 
instancesMetadata.instanceFromHost(dest1Name);
+        when(destinationMetadata.delegate())
+        .thenThrow(new CassandraUnavailableException(CQL_AND_JMX, 
"CassandraAdapterDelegate is not available"));
+
+        LiveMigrationDataCopyRequest request = new 
LiveMigrationDataCopyRequest(1, 1.0, 2);
+        Future<LiveMigrationTask> future = 
dataCopyTaskManager.createTask(request, dest1Name);
+        awaitForFuture(future);
+
+        assertThat(future.succeeded()).isTrue();
+        assertThat(future.failed()).isFalse();
+        assertThat(future.result()).isNotNull();
+        assertThat(future.result().id()).isNotNull();
+        assertThat(future.cause()).isNull();
+    }
+
     @Test
     public void testGetTaskSuccess()
     {
@@ -324,7 +393,7 @@ public class DataCopyTaskManagerTest
         CountDownLatch latch = new CountDownLatch(1);
         future.onComplete(res -> latch.countDown());
 
-        latch.await(100, TimeUnit.MILLISECONDS);
+        latch.await(2, TimeUnit.SECONDS);
     }
 
     private DataCopyTaskManager getDataCopyTaskManager(Injector injector)
@@ -388,17 +457,21 @@ public class DataCopyTaskManagerTest
             
when(mockInstancesMetadata.instanceFromHost(dest1Name)).thenReturn(mockDest1InstanceMeta);
             when(mockDest1InstanceMeta.id()).thenReturn(dest1Id);
             
when(mockDest1InstanceMeta.dataDirs()).thenReturn(List.of("/data1", "/data2"));
+            
when(mockDest1InstanceMeta.delegate()).thenReturn(mock(CassandraAdapterDelegate.class));
 
             
when(mockInstancesMetadata.instanceFromHost(dest2Name)).thenReturn(mockDest2InstanceMeta);
             when(mockDest2InstanceMeta.id()).thenReturn(dest2Id);
             
when(mockDest2InstanceMeta.dataDirs()).thenReturn(List.of("/data1", "/data2"));
+            
when(mockDest2InstanceMeta.delegate()).thenReturn(mock(CassandraAdapterDelegate.class));
 
             
when(mockInstancesMetadata.instanceFromHost(dest3Name)).thenReturn(mockDest3InstanceMeta);
             when(mockDest3InstanceMeta.id()).thenReturn(dest3Id);
             
when(mockDest3InstanceMeta.dataDirs()).thenReturn(List.of("/data1", "/data2"));
+            
when(mockDest3InstanceMeta.delegate()).thenReturn(mock(CassandraAdapterDelegate.class));
 
             
when(mockInstancesMetadata.instanceFromHost(source1Name)).thenReturn(mockSourceInstanceMeta);
             
when(mockSourceInstanceMeta.dataDirs()).thenReturn(List.of("/data1"));
+            
when(mockSourceInstanceMeta.delegate()).thenReturn(mock(CassandraAdapterDelegate.class));
 
             // Configure LiveMigrationTaskFactory to return fake tasks
             when(mockLiveMigrationTaskFactory.create(anyString(), 
any(LiveMigrationDataCopyRequest.class), anyString(), anyInt(), 
any(InstanceMetadata.class))).thenAnswer(invocation -> {
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloaderTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloaderTest.java
index 194683db0..7606d2088 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloaderTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationFileDownloaderTest.java
@@ -1144,6 +1144,68 @@ class LiveMigrationFileDownloaderTest
         assertThat(attrs.lastModifiedTime).isEqualTo(lastModified);
     }
 
+    @Test
+    void testDownloadFailsWhenPreCheckFails(@TempDir Path tmpDir) throws 
InterruptedException
+    {
+        String storageDir = 
tmpDir.resolve("testDownloadFailsWhenPreCheckFails").toAbsolutePath().toString();
+        List<String> dataDirs = getDataDirList(storageDir);
+        final Consumer<OperationStatus> statusUpdater = mock(Consumer.class);
+
+        LiveMigrationFileDownloadPreCheck failingPreCheck =
+        context -> Future.failedFuture(new RuntimeException("Pre-check failed: 
node already joined cluster"));
+
+        Injector injector = getInjector();
+        LiveMigrationFileDownloader downloader =
+        getDownloader(injector, dummyRequest100pThreshold, 0, statusUpdater, 
storageDir, dataDirs, failingPreCheck);
+
+        Future<OperationStatus> statusFuture = downloader.downloadFiles();
+        awaitForFuture(statusFuture);
+
+        assertThat(statusFuture.isComplete()).isTrue();
+        
assertThat(statusFuture.result().state()).isEqualTo(OperationStatus.State.FAILED);
+    }
+
+    @Test
+    void testPreCheckReceivesCorrectContext(@TempDir Path tmpDir) throws 
InterruptedException
+    {
+        String storageDir = 
tmpDir.resolve("testPreCheckReceivesCorrectContext").toAbsolutePath().toString();
+        List<String> dataDirs = getDataDirList(storageDir);
+        final Consumer<OperationStatus> statusUpdater = mock(Consumer.class);
+
+        // Pre-check that captures the context and then fails to stop the 
pipeline early
+        LiveMigrationFileDownloadPreCheck.PreCheckContext[] capturedContext =
+        new LiveMigrationFileDownloadPreCheck.PreCheckContext[1];
+        LiveMigrationFileDownloadPreCheck capturingPreCheck = context -> {
+            capturedContext[0] = context;
+            return Future.failedFuture(new RuntimeException("stop here"));
+        };
+
+        Injector injector = getInjector();
+        LiveMigrationFileDownloader downloader =
+        getDownloader(injector, dummyRequest100pThreshold, 0, statusUpdater, 
storageDir, dataDirs, capturingPreCheck);
+
+        Future<OperationStatus> statusFuture = downloader.downloadFiles();
+        awaitForFuture(statusFuture);
+
+        assertThat(capturedContext[0]).isNotNull();
+        assertThat(capturedContext[0].source()).isEqualTo(SOURCE);
+        assertThat(capturedContext[0].sidecarPort()).isEqualTo(PORT);
+        
assertThat(capturedContext[0].request()).isSameAs(dummyRequest100pThreshold);
+        
assertThat(capturedContext[0].destinationInstanceMetadata()).isNotNull();
+    }
+
+    @Test
+    void testDefaultPreCheckAlwaysSucceeds()
+    {
+        LiveMigrationFileDownloadPreCheck defaultPreCheck = 
LiveMigrationFileDownloadPreCheck.DEFAULT;
+        LiveMigrationFileDownloadPreCheck.PreCheckContext mockContext =
+        mock(LiveMigrationFileDownloadPreCheck.PreCheckContext.class);
+
+        Future<Void> result = defaultPreCheck.doCheck(mockContext);
+
+        assertThat(result.succeeded()).isTrue();
+    }
+
     @Test
     void testCanDeleteWithIOException(@TempDir Path tempDir) throws IOException
     {
@@ -1372,6 +1434,40 @@ class LiveMigrationFileDownloaderTest
                                           .source(SOURCE)
                                           .port(PORT)
                                           
.executorPools(ExecutorPoolsHelper.createdSharedTestPool(vertx))
+                                          
.preCheck(LiveMigrationFileDownloadPreCheck.DEFAULT)
+                                          .build();
+    }
+
+    LiveMigrationFileDownloader getDownloader(Injector injector,
+                                              LiveMigrationDataCopyRequest 
request,
+                                              int currentIteration,
+                                              Consumer<OperationStatus> 
mockStatusUpdater,
+                                              String storageDir,
+                                              List<String> dataDirs,
+                                              
LiveMigrationFileDownloadPreCheck preCheck)
+    {
+        SidecarClientProvider sidecarClientProvider = 
injector.getInstance(SidecarClientProvider.class);
+        LiveMigrationConfiguration liveMigrationConfig = 
injector.getInstance(SidecarConfiguration.class)
+                                                                 
.liveMigrationConfiguration();
+        return LiveMigrationFileDownloader.builder()
+                                          .id(UUID.randomUUID().toString())
+                                          .vertx(vertx)
+                                          
.sidecarClient(sidecarClientProvider.get())
+                                          .request(request)
+                                          .iteration(currentIteration)
+                                          .statusUpdater(mockStatusUpdater)
+                                          
.instanceMetadata(InstanceMetadataImpl.builder()
+                                                                               
 .dataDirs(dataDirs)
+                                                                               
 .storageDir(storageDir)
+                                                                               
 .metricRegistry(new MetricRegistry())
+                                                                               
 .id(1)
+                                                                               
 .storagePort(7000)
+                                                                               
 .build())
+                                          
.liveMigrationConfiguration(liveMigrationConfig)
+                                          .source(SOURCE)
+                                          .port(PORT)
+                                          
.executorPools(ExecutorPoolsHelper.createdSharedTestPool(vertx))
+                                          .preCheck(preCheck)
                                           .build();
     }
 
diff --git 
a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImplTest.java
 
b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImplTest.java
index 4976867c7..1ebe83ed2 100644
--- 
a/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImplTest.java
+++ 
b/server/src/test/java/org/apache/cassandra/sidecar/livemigration/LiveMigrationTaskImplTest.java
@@ -62,7 +62,7 @@ class LiveMigrationTaskImplTest
         ExecutorPools executorPools = 
ExecutorPoolsHelper.createdSharedTestPool(vertx);
 
         return new LiveMigrationTaskImpl(vertx, executorPools, 
sidecarClientProvider, liveMigrationConfiguration,
-                                         id, request, SOURCE, PORT, 
instanceMetadata);
+                                         id, request, SOURCE, PORT, 
instanceMetadata, LiveMigrationFileDownloadPreCheck.DEFAULT);
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to