JeetKunDoug commented on code in PR #73:
URL:
https://github.com/apache/cassandra-analytics/pull/73#discussion_r1716939928
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java:
##########
@@ -182,7 +183,7 @@ public BulkSparkConf(SparkConf conf, Map<String, String>
options)
// else fall back to props, and then default if neither specified
this.useOpenSsl = getBoolean(USE_OPENSSL, true);
this.ringRetryCount = getInt(RING_RETRY_COUNT,
DEFAULT_RING_RETRY_COUNT);
- this.importCoordinatorTimeoutMultiplier =
getInt(IMPORT_COORDINATOR_TIMEOUT_MULTIPLIER, 2);
+ this.importCoordinatorTimeoutMultiplier =
getDouble(IMPORT_COORDINATOR_TIMEOUT_MULTIPLIER, 2.0);
Review Comment:
Given our experience with this setting and the fact that it's incredibly
conservative, what are your thoughts on making it `0.5`... this is only used
_after_ we've reached the desired consistency level and is a hedge against
later repairs running, so it may be worth reducing.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinator.java:
##########
@@ -165,6 +165,11 @@ public void waitForCompletion()
}
}
+ public boolean hasConsistencyLevelReached()
Review Comment:
maybe `hasReachedConsistencyLevel`?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java:
##########
@@ -243,7 +244,7 @@ private void persist(@NotNull JavaPairRDD<DecoratedKey,
Object[]> sortedRDD, Str
{
try
{
- onCloudStorageTransport(ignored -> heartbeatReporter.close());
+ onCloudStorageTransport(ignored ->
simpleTaskScheduler.close());
Review Comment:
Should this still be called within `onCloudStorageTransport` now that we're
always scheduling a timeout?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SimpleTaskScheduler.java:
##########
@@ -34,53 +34,62 @@
import org.apache.cassandra.util.ThreadUtil;
-public class HeartbeatReporter implements Closeable
+/**
+ * Scheduler for simple and short tasks
+ */
+public class SimpleTaskScheduler implements Closeable
{
- private static final Logger LOGGER =
LoggerFactory.getLogger(HeartbeatReporter.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleTaskScheduler.class);
private final ScheduledExecutorService scheduler;
- private final Map<String, ScheduledFuture<?>> scheduledHeartbeats;
+ private final Map<String, ScheduledFuture<?>> scheduledTasks;
private boolean isClosed;
- public HeartbeatReporter()
+ public SimpleTaskScheduler()
{
- ThreadFactory tf = ThreadUtil.threadFactory("Heartbeat reporter");
+ ThreadFactory tf =
ThreadUtil.threadFactory("bulk-write-simple-task-scheduler");
this.scheduler = Executors.newSingleThreadScheduledExecutor(tf);
- this.scheduledHeartbeats = new HashMap<>();
+ this.scheduledTasks = new HashMap<>();
this.isClosed = false;
}
- public synchronized void schedule(String name, long
heartBeatIntervalMillis, Runnable heartBeat)
+ public synchronized void schedule(String name, long delayMillis, Runnable
task)
Review Comment:
See comments above about replacing long delay values with Durations.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SimpleTaskScheduler.java:
##########
@@ -114,6 +123,25 @@ public synchronized void close()
}
}
+ private boolean isClosed()
+ {
+ if (isClosed)
+ {
+ LOGGER.info("SimpleTaskScheduler is already closed");
Review Comment:
Do we need this at `info` level for a boolean method that is _supposed_ to
return if the scheduler is closed or not? Seems like, if it's necessary at all,
it should be `debug`?
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/SimpleTaskScheduler.java:
##########
@@ -34,53 +34,62 @@
import org.apache.cassandra.util.ThreadUtil;
-public class HeartbeatReporter implements Closeable
+/**
+ * Scheduler for simple and short tasks
+ */
+public class SimpleTaskScheduler implements Closeable
{
- private static final Logger LOGGER =
LoggerFactory.getLogger(HeartbeatReporter.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleTaskScheduler.class);
private final ScheduledExecutorService scheduler;
- private final Map<String, ScheduledFuture<?>> scheduledHeartbeats;
+ private final Map<String, ScheduledFuture<?>> scheduledTasks;
private boolean isClosed;
- public HeartbeatReporter()
+ public SimpleTaskScheduler()
{
- ThreadFactory tf = ThreadUtil.threadFactory("Heartbeat reporter");
+ ThreadFactory tf =
ThreadUtil.threadFactory("bulk-write-simple-task-scheduler");
this.scheduler = Executors.newSingleThreadScheduledExecutor(tf);
- this.scheduledHeartbeats = new HashMap<>();
+ this.scheduledTasks = new HashMap<>();
this.isClosed = false;
}
- public synchronized void schedule(String name, long
heartBeatIntervalMillis, Runnable heartBeat)
+ public synchronized void schedule(String name, long delayMillis, Runnable
task)
{
- if (isClosed)
+ if (isClosed() || isScheduled(name))
{
- LOGGER.info("HeartbeatReporter is already closed");
return;
}
- if (scheduledHeartbeats.containsKey(name))
+ ScheduledFuture<?> fut = scheduler.schedule(new NoThrow(name, task),
+ delayMillis,
+ TimeUnit.MILLISECONDS);
+ scheduledTasks.put(name, fut);
+ }
+
+ public synchronized void schedulePeriodic(String name, long
taskIntervalMillis, Runnable task)
+ {
+ if (isClosed() || isScheduled(name))
{
- LOGGER.info("The heartbeat has been scheduled already.
heartbeat={}", name);
return;
}
- ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new
NoThrow(name, heartBeat),
-
heartBeatIntervalMillis, // initial delay
-
heartBeatIntervalMillis, // delay
+
+ ScheduledFuture<?> fut = scheduler.scheduleWithFixedDelay(new
NoThrow(name, task),
+
taskIntervalMillis, // initial delay
+
taskIntervalMillis, // delay
TimeUnit.MILLISECONDS);
- scheduledHeartbeats.put(name, fut);
+ scheduledTasks.put(name, fut);
Review Comment:
Should we be concerned about overwriting an existing scheduled task? Given
the name is just a string, it'd be easy to potentially create multiple tasks
with the same name, at which point only the last one would be in the scheduled
task map, but others would be running and no longer reachable anywhere (so they
can't be retrieved/cancelled later) ...
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SimpleTaskSchedulerTest.java:
##########
@@ -29,34 +29,34 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class HeartbeatReporterTest
+public class SimpleTaskSchedulerTest
{
- private HeartbeatReporter heartbeatReporter = new HeartbeatReporter();
+ private SimpleTaskScheduler simpleTaskScheduler = new
SimpleTaskScheduler();
private String heartbeatName = "test-heartbeat";
@AfterEach
public void teardown()
{
- heartbeatReporter.unschedule(heartbeatName);
+ simpleTaskScheduler.unschedule(heartbeatName);
}
@Test
- public void testScheduleHeartbeat()
+ public void testSchedulePeriodicHeartbeat()
Review Comment:
```suggestion
public void testSchedulePeriodic()
```
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinator.java:
##########
@@ -219,28 +224,42 @@ private void addCompletionMonitor(CompletableFuture<?>
future)
// whenComplete callback will still be invoked when the future is
cancelled.
// In such case, expect CancellationException
future.whenComplete((v, t) -> {
- LOGGER.info("Completed slice requests {}/{}",
completedSlices.incrementAndGet(), importFutures.keySet().size());
-
if (t instanceof CancellationException)
{
RequestAndInstance rai = importFutures.get(future);
LOGGER.info("Cancelled import. instance={} slice={}",
rai.nodeFqdn, rai.requestPayload);
return;
}
+ LOGGER.info("Completed slice requests {}/{}",
completedSlices.incrementAndGet(), importFutures.keySet().size());
+
// only enter the block once
if (satisfiedSlices.get() == totalSlices
- && terminalScheduled.compareAndSet(false, true))
+ && consistencyLevelReached.compareAndSet(false, true))
{
- long timeToAllSatisfiedNanos = System.nanoTime() -
waitStartNanos;
- long timeout = estimateTimeout(timeToAllSatisfiedNanos);
+ long nowNanos = System.nanoTime();
+ long timeToAllSatisfiedNanos = nowNanos - waitStartNanos;
+ long elapsedNanos = nowNanos - startTimeNanos;
+ long timeoutNanos =
estimateTimeoutNanos(timeToAllSatisfiedNanos, elapsedNanos,
+
job.importCoordinatorTimeoutMultiplier(),
+ minSliceSize,
maxSliceSize,
+
job.jobTimeoutSeconds());
LOGGER.info("The specified consistency level of the job has
been satisfied. " +
Review Comment:
Realize this should probably be inside the `if (timeoutNanos > 0)` block, as
otherwise we'll be logging some negative number of seconds here.
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinator.java:
##########
@@ -258,23 +277,41 @@ private void await()
validateAllRangesAreSatisfied();
}
- // calculate the timeout based on the 1) time taken to have all slices
satisfied, and 2) use import rate
- private long estimateTimeout(long timeToAllSatisfiedNanos)
+ // Calculate the timeout based on the 1) time taken to have all slices
satisfied, 2) use import rate and 3) jobTimeoutSeconds
+ // The effective timeout is the min of the estimate and the
jobTimeoutSeconds (when specified)
+ static long estimateTimeoutNanos(long timeToAllSatisfiedNanos,
+ long elapsedNanos,
+ double importCoordinatorTimeoutMultiplier,
+ double minSliceSize,
+ double maxSliceSize,
+ long jobTimeoutSeconds)
{
- long timeout = timeToAllSatisfiedNanos;
+ long timeoutNanos = timeToAllSatisfiedNanos;
// use the minSliceSize to get the slowest import rate. R =
minSliceSize / T
// use the maxSliceSize to get the highest amount of time needed for
import. D = maxSliceSize / R
// Please do not combine the two statements below for readability
purpose
- double estimatedRateFloor = ((double) minSliceSize) /
timeToAllSatisfiedNanos;
- double timeEstimateBasedOnRate = ((double) maxSliceSize) /
estimatedRateFloor;
- timeout = Math.max((long) timeEstimateBasedOnRate, timeout);
- timeout = job.importCoordinatorTimeoutMultiplier() * timeout;
- if (TimeUnit.NANOSECONDS.toHours(timeout) > 1)
+ double estimatedRateFloor = minSliceSize / timeToAllSatisfiedNanos;
+ double timeEstimateBasedOnRate = maxSliceSize / estimatedRateFloor;
+ double estimate = Math.max(timeEstimateBasedOnRate, (double)
timeoutNanos);
+ timeoutNanos = (long) Math.ceil(importCoordinatorTimeoutMultiplier *
estimate);
+ // consider the jobTimeoutSeconds only if it is specified
+ if (jobTimeoutSeconds != -1)
+ {
+ long remainingIdealTimeoutNanos =
TimeUnit.SECONDS.toNanos(jobTimeoutSeconds) - elapsedNanos;
+ if (remainingIdealTimeoutNanos <= 0)
+ {
+ // ideal timeout has passed, and we have already achieved the
desired consistency level.
+ // Do not wait any longer
+ return 0;
+ }
+ timeoutNanos = Math.min(timeoutNanos, remainingIdealTimeoutNanos);
Review Comment:
I'd remove the usage of the term "ideal" here as we've essentially just
called it "job timeout" everywhere else.
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/SimpleTaskSchedulerTest.java:
##########
@@ -29,34 +29,34 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class HeartbeatReporterTest
+public class SimpleTaskSchedulerTest
{
- private HeartbeatReporter heartbeatReporter = new HeartbeatReporter();
+ private SimpleTaskScheduler simpleTaskScheduler = new
SimpleTaskScheduler();
private String heartbeatName = "test-heartbeat";
Review Comment:
```suggestion
private String taskName = "test-task";
```
##########
cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraBulkSourceRelation.java:
##########
@@ -434,4 +435,22 @@ private void
abortRestoreJob(TransportContext.CloudStorageTransportContext conte
throw new RuntimeException("Failed to abort the restore job on
Sidecar. jobId: " + jobId, e);
}
}
+
+ private void maybeScheduleTimeout()
+ {
+ long timeoutSeconds = writerContext.job().jobTimeoutSeconds();
+ if (timeoutSeconds != -1)
+ {
+ long timeoutMillis = TimeUnit.SECONDS.toMillis(timeoutSeconds);
+ LOGGER.info("Scheduled job timeout. timeoutSeconds={}",
timeoutSeconds);
+ simpleTaskScheduler.schedule("Job timeout", timeoutMillis, () -> {
Review Comment:
Suggestion - make `schedule` and `schedulePeriodic` take `Duration` in stead
of a Long that is just named timeoutMillis.
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java:
##########
@@ -237,6 +240,51 @@ void testAwaitShouldPassWithStuckSliceWhenClSatisfied()
"Each replica set should have a slice gets cancelled due
to making no progress");
}
+ @Test
+ void testAwaitShouldBlockUntilClSatisfiedWhenIdealTimeoutIsLow()
+ {
+ // it should produce a really large estimate; the coordinator ignores
it anyway due to small ideal timeout
+
when(mockJobInfo.importCoordinatorTimeoutMultiplier()).thenReturn(1000.0);
+ when(mockJobInfo.jobTimeoutSeconds()).thenReturn(1L); // low ideal
timeout lead to immediate termination as soon as CL is satisfied
+ List<BlobStreamResult> resultList =
buildBlobStreamResultWithNoProgressImports(/* Stuck slice per replica set */ 1,
/* importTimeMillis */ 100L);
+ ImportCompletionCoordinator coordinator =
ImportCompletionCoordinator.of(System.nanoTime(), mockWriterContext,
dataTransferApi,
+
writerValidator, resultList, mockExtension, onCancelJob);
+ coordinator.waitForCompletion();
+ // the import should complete as soon as CL is satisfied
+ assertEquals(resultList.size(),
appliedObjectKeys.getAllValues().size(),
+ "All objects should be applied and reported for exactly
once");
+ assertEquals(allTestObjectKeys(), new
HashSet<>(appliedObjectKeys.getAllValues()));
+ Map<CompletableFuture<Void>, RequestAndInstance> importFutures =
coordinator.importFutures();
+ // all the other imports should be cancelled, given ideal timeout has
exceeded
+ int cancelledImports = importFutures.keySet().stream().mapToInt(f ->
f.isCancelled() ? 1 : 0).sum();
+ assertEquals(TOTAL_INSTANCES, cancelledImports,
+ "Each replica set should have a slice gets cancelled due
to making no progress");
+ }
+
+ @Test
+ void testAwaitShouldBlockUntilIdealTimeoutExceeds()
+ {
+ // it should produce a really large estimate; the coordinator ignores
it anyway due to small ideal timeout
+
when(mockJobInfo.importCoordinatorTimeoutMultiplier()).thenReturn(1000.0);
+ long idealTimeout = 5L;
+ when(mockJobInfo.jobTimeoutSeconds()).thenReturn(idealTimeout); //
await at most 10 seconds
Review Comment:
Comment (10 seconds) and value of `idealTimeout` (5 seconds) seem to not
agree.
##########
cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java:
##########
@@ -237,6 +240,51 @@ void testAwaitShouldPassWithStuckSliceWhenClSatisfied()
"Each replica set should have a slice gets cancelled due
to making no progress");
}
+ @Test
+ void testAwaitShouldBlockUntilClSatisfiedWhenIdealTimeoutIsLow()
Review Comment:
Same comment about removing references to `Ideal`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]