JeetKunDoug commented on code in PR #73:
URL: 
https://github.com/apache/cassandra-analytics/pull/73#discussion_r1716939928


##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java:
##########
@@ -182,7 +183,7 @@ public BulkSparkConf(SparkConf conf, Map<String, String> 
options)
         // else fall back to props, and then default if neither specified
         this.useOpenSsl = getBoolean(USE_OPENSSL, true);
         this.ringRetryCount = getInt(RING_RETRY_COUNT, 
DEFAULT_RING_RETRY_COUNT);
-        this.importCoordinatorTimeoutMultiplier = 
getInt(IMPORT_COORDINATOR_TIMEOUT_MULTIPLIER, 2);
+        this.importCoordinatorTimeoutMultiplier = 
getDouble(IMPORT_COORDINATOR_TIMEOUT_MULTIPLIER, 2.0);

Review Comment:
   Given our experience with this setting and the fact that it's incredibly 
conservative, what are your thoughts on making it `0.5`... this is only used 
_after_ we've reached the desired consistency level and is a hedge against 
later repairs running, so it may be worth reducing.



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinator.java:
##########
@@ -165,6 +165,11 @@ public void waitForCompletion()
         }
     }
 
+    public boolean hasConsistencyLevelReached()

Review Comment:
   maybe `hasReachedConsistencyLevel`?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java:
##########
@@ -243,7 +244,7 @@ private void persist(@NotNull JavaPairRDD<DecoratedKey, 
Object[]> sortedRDD, Str
         {
             try
             {
-                onCloudStorageTransport(ignored -> heartbeatReporter.close());
+                onCloudStorageTransport(ignored -> 
simpleTaskScheduler.close());

Review Comment:
   Should this still be called within `onCloudStorageTransport` now that we're 
always scheduling a timeout?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SimpleTaskScheduler.java:
##########
@@ -34,53 +34,62 @@
 
 import org.apache.cassandra.util.ThreadUtil;
 
-public class HeartbeatReporter implements Closeable
+/**
+ * Scheduler for simple and short tasks
+ */
+public class SimpleTaskScheduler implements Closeable
 {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(HeartbeatReporter.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleTaskScheduler.class);
 
     private final ScheduledExecutorService scheduler;
-    private final Map<String, ScheduledFuture<?>> scheduledHeartbeats;
+    private final Map<String, ScheduledFuture<?>> scheduledTasks;
     private boolean isClosed;
 
-    public HeartbeatReporter()
+    public SimpleTaskScheduler()
     {
-        ThreadFactory tf = ThreadUtil.threadFactory("Heartbeat reporter");
+        ThreadFactory tf = 
ThreadUtil.threadFactory("bulk-write-simple-task-scheduler");
         this.scheduler = Executors.newSingleThreadScheduledExecutor(tf);
-        this.scheduledHeartbeats = new HashMap<>();
+        this.scheduledTasks = new HashMap<>();
         this.isClosed = false;
     }
 
-    public synchronized void schedule(String name, long 
heartBeatIntervalMillis, Runnable heartBeat)
+    public synchronized void schedule(String name, long delayMillis, Runnable 
task)

Review Comment:
   See comments above about replacing long delay values with Durations.



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SimpleTaskScheduler.java:
##########
@@ -114,6 +123,25 @@ public synchronized void close()
         }
     }
 
+    private boolean isClosed()
+    {
+        if (isClosed)
+        {
+            LOGGER.info("SimpleTaskScheduler is already closed");

Review Comment:
   Do we need this at `info` level for a boolean method that is _supposed_ to 
return if the scheduler is closed or not? Seems like, if it's necessary at all, 
it should be `debug`?



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SimpleTaskScheduler.java:
##########
@@ -34,53 +34,62 @@
 
 import org.apache.cassandra.util.ThreadUtil;
 
-public class HeartbeatReporter implements Closeable
+/**
+ * Scheduler for simple and short tasks
+ */
+public class SimpleTaskScheduler implements Closeable
 {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(HeartbeatReporter.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleTaskScheduler.class);
 
     private final ScheduledExecutorService scheduler;
-    private final Map<String, ScheduledFuture<?>> scheduledHeartbeats;
+    private final Map<String, ScheduledFuture<?>> scheduledTasks;
     private boolean isClosed;
 
-    public HeartbeatReporter()
+    public SimpleTaskScheduler()
     {
-        ThreadFactory tf = ThreadUtil.threadFactory("Heartbeat reporter");
+        ThreadFactory tf = 
ThreadUtil.threadFactory("bulk-write-simple-task-scheduler");
         this.scheduler = Executors.newSingleThreadScheduledExecutor(tf);
-        this.scheduledHeartbeats = new HashMap<>();
+        this.scheduledTasks = new HashMap<>();
         this.isClosed = false;
     }
 
-    public synchronized void schedule(String name, long 
heartBeatIntervalMillis, Runnable heartBeat)
+    public synchronized void schedule(String name, long delayMillis, Runnable 
task)
     {
-        if (isClosed)
+        if (isClosed() || isScheduled(name))
         {
-            LOGGER.info("HeartbeatReporter is already closed");
             return;
         }
 
-        if (scheduledHeartbeats.containsKey(name))
+        ScheduledFuture<?> fut = scheduler.schedule(new NoThrow(name, task),
+                                                    delayMillis,
+                                                    TimeUnit.MILLISECONDS);
+        scheduledTasks.put(name, fut);
+    }
+
+    public synchronized void schedulePeriodic(String name, long 
taskIntervalMillis, Runnable task)
+    {
+        if (isClosed() || isScheduled(name))
         {
-            LOGGER.info("The heartbeat has been scheduled already. 
heartbeat={}", name);
             return;
         }
-        ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new 
NoThrow(name, heartBeat),
-                                                                  
heartBeatIntervalMillis, // initial delay
-                                                                  
heartBeatIntervalMillis, // delay
+
+        ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new 
NoThrow(name, task),
+                                                                  
taskIntervalMillis, // initial delay
+                                                                  
taskIntervalMillis, // delay
                                                                   
TimeUnit.MILLISECONDS);
-        scheduledHeartbeats.put(name, fut);
+        scheduledTasks.put(name, fut);

Review Comment:
   Should we be concerned about overwriting an existing scheduled task? Given 
the name is just a string, it'd be easy to potentially create multiple tasks 
with the same name, at which point only the last one would be in the scheduled 
task map, but others would be running and no longer reachable anywhere (so they 
can't be retrieved/cancelled later) ...



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SimpleTaskSchedulerTest.java:
##########
@@ -29,34 +29,34 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class HeartbeatReporterTest
+public class SimpleTaskSchedulerTest
 {
-    private HeartbeatReporter heartbeatReporter = new HeartbeatReporter();
+    private SimpleTaskScheduler simpleTaskScheduler = new 
SimpleTaskScheduler();
     private String heartbeatName = "test-heartbeat";
 
     @AfterEach
     public void teardown()
     {
-        heartbeatReporter.unschedule(heartbeatName);
+        simpleTaskScheduler.unschedule(heartbeatName);
     }
 
     @Test
-    public void testScheduleHeartbeat()
+    public void testSchedulePeriodicHeartbeat()

Review Comment:
   ```suggestion
       public void testSchedulePeriodic()
   ```



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinator.java:
##########
@@ -219,28 +224,42 @@ private void addCompletionMonitor(CompletableFuture<?> 
future)
         // whenComplete callback will still be invoked when the future is 
cancelled.
         // In such case, expect CancellationException
         future.whenComplete((v, t) -> {
-            LOGGER.info("Completed slice requests {}/{}", 
completedSlices.incrementAndGet(), importFutures.keySet().size());
-
             if (t instanceof CancellationException)
             {
                 RequestAndInstance rai = importFutures.get(future);
                 LOGGER.info("Cancelled import. instance={} slice={}", 
rai.nodeFqdn, rai.requestPayload);
                 return;
             }
 
+            LOGGER.info("Completed slice requests {}/{}", 
completedSlices.incrementAndGet(), importFutures.keySet().size());
+
             // only enter the block once
             if (satisfiedSlices.get() == totalSlices
-                && terminalScheduled.compareAndSet(false, true))
+                && consistencyLevelReached.compareAndSet(false, true))
             {
-                long timeToAllSatisfiedNanos = System.nanoTime() - 
waitStartNanos;
-                long timeout = estimateTimeout(timeToAllSatisfiedNanos);
+                long nowNanos = System.nanoTime();
+                long timeToAllSatisfiedNanos = nowNanos - waitStartNanos;
+                long elapsedNanos = nowNanos - startTimeNanos;
+                long timeoutNanos = 
estimateTimeoutNanos(timeToAllSatisfiedNanos, elapsedNanos,
+                                                         
job.importCoordinatorTimeoutMultiplier(),
+                                                         minSliceSize, 
maxSliceSize,
+                                                         
job.jobTimeoutSeconds());
                 LOGGER.info("The specified consistency level of the job has 
been satisfied. " +

Review Comment:
   Realize this should probably be inside the `if (timeoutNanos > 0)` block, as 
otherwise we'll be logging some negative number of seconds here.



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinator.java:
##########
@@ -258,23 +277,41 @@ private void await()
         validateAllRangesAreSatisfied();
     }
 
-    // calculate the timeout based on the 1) time taken to have all slices 
satisfied, and 2) use import rate
-    private long estimateTimeout(long timeToAllSatisfiedNanos)
+    // Calculate the timeout based on the 1) time taken to have all slices 
satisfied, 2) use import rate and 3) jobTimeoutSeconds
+    // The effective timeout is the min of the estimate and the 
jobTimeoutSeconds (when specified)
+    static long estimateTimeoutNanos(long timeToAllSatisfiedNanos,
+                                     long elapsedNanos,
+                                     double importCoordinatorTimeoutMultiplier,
+                                     double minSliceSize,
+                                     double maxSliceSize,
+                                     long jobTimeoutSeconds)
     {
-        long timeout = timeToAllSatisfiedNanos;
+        long timeoutNanos = timeToAllSatisfiedNanos;
         // use the minSliceSize to get the slowest import rate. R = 
minSliceSize / T
         // use the maxSliceSize to get the highest amount of time needed for 
import. D = maxSliceSize / R
         // Please do not combine the two statements below for readability 
purpose
-        double estimatedRateFloor = ((double) minSliceSize) / 
timeToAllSatisfiedNanos;
-        double timeEstimateBasedOnRate = ((double) maxSliceSize) / 
estimatedRateFloor;
-        timeout = Math.max((long) timeEstimateBasedOnRate, timeout);
-        timeout = job.importCoordinatorTimeoutMultiplier() * timeout;
-        if (TimeUnit.NANOSECONDS.toHours(timeout) > 1)
+        double estimatedRateFloor = minSliceSize / timeToAllSatisfiedNanos;
+        double timeEstimateBasedOnRate = maxSliceSize / estimatedRateFloor;
+        double estimate = Math.max(timeEstimateBasedOnRate, (double) 
timeoutNanos);
+        timeoutNanos = (long) Math.ceil(importCoordinatorTimeoutMultiplier * 
estimate);
+        // consider the jobTimeoutSeconds only if it is specified
+        if (jobTimeoutSeconds != -1)
+        {
+            long remainingIdealTimeoutNanos = 
TimeUnit.SECONDS.toNanos(jobTimeoutSeconds) - elapsedNanos;
+            if (remainingIdealTimeoutNanos <= 0)
+            {
+                // ideal timeout has passed, and we have already achieved the 
desired consistency level.
+                // Do not wait any longer
+                return 0;
+            }
+            timeoutNanos = Math.min(timeoutNanos, remainingIdealTimeoutNanos);

Review Comment:
   I'd remove the usage of the term "ideal" here as we've essentially just 
called it "job timeout" everywhere else.



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SimpleTaskSchedulerTest.java:
##########
@@ -29,34 +29,34 @@
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class HeartbeatReporterTest
+public class SimpleTaskSchedulerTest
 {
-    private HeartbeatReporter heartbeatReporter = new HeartbeatReporter();
+    private SimpleTaskScheduler simpleTaskScheduler = new 
SimpleTaskScheduler();
     private String heartbeatName = "test-heartbeat";

Review Comment:
   ```suggestion
       private String taskName = "test-task";
   ```



##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java:
##########
@@ -434,4 +435,22 @@ private void 
abortRestoreJob(TransportContext.CloudStorageTransportContext conte
             throw new RuntimeException("Failed to abort the restore job on 
Sidecar. jobId: " + jobId, e);
         }
     }
+
+    private void maybeScheduleTimeout()
+    {
+        long timeoutSeconds = writerContext.job().jobTimeoutSeconds();
+        if (timeoutSeconds != -1)
+        {
+            long timeoutMillis = TimeUnit.SECONDS.toMillis(timeoutSeconds);
+            LOGGER.info("Scheduled job timeout. timeoutSeconds={}", 
timeoutSeconds);
+            simpleTaskScheduler.schedule("Job timeout", timeoutMillis, () -> {

Review Comment:
   Suggestion - make `schedule` and `schedulePeriodic` take `Duration` in stead 
of a Long that is just named timeoutMillis.



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java:
##########
@@ -237,6 +240,51 @@ void testAwaitShouldPassWithStuckSliceWhenClSatisfied()
                      "Each replica set should have a slice gets cancelled due 
to making no progress");
     }
 
+    @Test
+    void testAwaitShouldBlockUntilClSatisfiedWhenIdealTimeoutIsLow()
+    {
+        // it should produce a really large estimate; the coordinator ignores 
it anyway due to small ideal timeout
+        
when(mockJobInfo.importCoordinatorTimeoutMultiplier()).thenReturn(1000.0);
+        when(mockJobInfo.jobTimeoutSeconds()).thenReturn(1L); // low ideal 
timeout lead to immediate termination as soon as CL is satisfied
+        List<BlobStreamResult> resultList = 
buildBlobStreamResultWithNoProgressImports(/* Stuck slice per replica set */ 1, 
/* importTimeMillis */ 100L);
+        ImportCompletionCoordinator coordinator = 
ImportCompletionCoordinator.of(System.nanoTime(), mockWriterContext, 
dataTransferApi,
+                                                                               
  writerValidator, resultList, mockExtension, onCancelJob);
+        coordinator.waitForCompletion();
+        // the import should complete as soon as CL is satisfied
+        assertEquals(resultList.size(), 
appliedObjectKeys.getAllValues().size(),
+                     "All objects should be applied and reported for exactly 
once");
+        assertEquals(allTestObjectKeys(), new 
HashSet<>(appliedObjectKeys.getAllValues()));
+        Map<CompletableFuture<Void>, RequestAndInstance> importFutures = 
coordinator.importFutures();
+        // all the other imports should be cancelled, given ideal timeout has 
exceeded
+        int cancelledImports = importFutures.keySet().stream().mapToInt(f -> 
f.isCancelled() ? 1 : 0).sum();
+        assertEquals(TOTAL_INSTANCES, cancelledImports,
+                     "Each replica set should have a slice gets cancelled due 
to making no progress");
+    }
+
+    @Test
+    void testAwaitShouldBlockUntilIdealTimeoutExceeds()
+    {
+        // it should produce a really large estimate; the coordinator ignores 
it anyway due to small ideal timeout
+        
when(mockJobInfo.importCoordinatorTimeoutMultiplier()).thenReturn(1000.0);
+        long idealTimeout = 5L;
+        when(mockJobInfo.jobTimeoutSeconds()).thenReturn(idealTimeout); // 
await at most 10 seconds

Review Comment:
   Comment (10 seconds) and value of `idealTimeout` (5 seconds) seem to not 
agree.
   



##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java:
##########
@@ -237,6 +240,51 @@ void testAwaitShouldPassWithStuckSliceWhenClSatisfied()
                      "Each replica set should have a slice gets cancelled due 
to making no progress");
     }
 
+    @Test
+    void testAwaitShouldBlockUntilClSatisfiedWhenIdealTimeoutIsLow()

Review Comment:
   Same comment about removing references to `Ideal`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to