wchevreuil commented on code in PR #5605:
URL: https://github.com/apache/hbase/pull/5605#discussion_r1524702737
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java:
##########
@@ -115,23 +132,89 @@ public static void cancel(Path path) {
// ok to race with other cancellation attempts
future.cancel(true);
prefetchFutures.remove(path);
+ prefetchRunnable.remove(path);
LOG.debug("Prefetch cancelled for {}", path);
}
}
- public static boolean isCompleted(Path path) {
+ public static void interrupt(Path path) {
Future<?> future = prefetchFutures.get(path);
if (future != null) {
- return future.isDone();
+ prefetchFutures.remove(path);
+ // ok to race with other cancellation attempts
+ future.cancel(true);
+ LOG.debug("Prefetch cancelled for {}", path);
}
- return true;
}
private PrefetchExecutor() {
}
+ public static boolean isCompleted(Path path) {
+ Future<?> future = prefetchFutures.get(path);
+ if (future != null) {
+ return future.isDone();
+ }
+ return true;
+ }
+
/* Visible for testing only */
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
static ScheduledExecutorService getExecutorPool() {
return prefetchExecutorPool;
}
+
+ /* Visible for testing only */
+ @RestrictedApi(explanation = "Should only be called in tests. This is a non
thread safe "
+ + "variable that would not yield accurate values when multiple readers
created and "
+ + "calling PrefetchExecutor.request concurrently", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ public static long getComputedPrefetchDelay() {return computedPrefetchDelay;}
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ static Map<Path, Future<?>> getPrefetchFutures() {
+ return prefetchFutures;
+ }
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ static Map<Path, Runnable> getPrefetchRunnable() {
+ return prefetchRunnable;
+ }
+
+ static boolean isPrefetchStarted() {
+ AtomicBoolean prefetchStarted = new AtomicBoolean(false);
+ for (Map.Entry<Path, Future<?>> entry : prefetchFutures.entrySet()) {
+ Path k = entry.getKey();
+ Future<?> v = entry.getValue();
+ ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
+ long waitTime = sf.getDelay(TimeUnit.MILLISECONDS);
+ LOG.info("Remaining wait time is : {}", waitTime);
+ if (waitTime < 0) {
+ //At this point prefetch is started
+ prefetchStarted.set(true);
+ break;
+ }
+ }
+ return prefetchStarted.get();
+ }
+
+ public static int getPrefetchDelay() {
+ return prefetchDelayMillis;
+ }
+
+ public static void loadConfiguration(Configuration conf) {
+ prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
+ prefetchFutures.forEach((k, v) -> {
+ // Do not cancel the task which is about to complete
Review Comment:
That's not accurate. We are actually not cancelling running threads, but we
have no idea if it's close to completion or not.
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java:
##########
@@ -1658,6 +1659,10 @@ public boolean prefetchComplete() {
return PrefetchExecutor.isCompleted(path);
}
Review Comment:
Please add "Override" annotation and Javadoc comments.
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/PrefetchExecutor.java:
##########
@@ -115,23 +132,89 @@ public static void cancel(Path path) {
// ok to race with other cancellation attempts
future.cancel(true);
prefetchFutures.remove(path);
+ prefetchRunnable.remove(path);
LOG.debug("Prefetch cancelled for {}", path);
}
}
- public static boolean isCompleted(Path path) {
+ public static void interrupt(Path path) {
Future<?> future = prefetchFutures.get(path);
if (future != null) {
- return future.isDone();
+ prefetchFutures.remove(path);
+ // ok to race with other cancellation attempts
+ future.cancel(true);
+ LOG.debug("Prefetch cancelled for {}", path);
}
- return true;
}
private PrefetchExecutor() {
}
+ public static boolean isCompleted(Path path) {
+ Future<?> future = prefetchFutures.get(path);
+ if (future != null) {
+ return future.isDone();
+ }
+ return true;
+ }
+
/* Visible for testing only */
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
static ScheduledExecutorService getExecutorPool() {
return prefetchExecutorPool;
}
+
+ /* Visible for testing only */
+ @RestrictedApi(explanation = "Should only be called in tests. This is a non
thread safe "
+ + "variable that would not yield accurate values when multiple readers
created and "
+ + "calling PrefetchExecutor.request concurrently", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ public static long getComputedPrefetchDelay() {return computedPrefetchDelay;}
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ static Map<Path, Future<?>> getPrefetchFutures() {
+ return prefetchFutures;
+ }
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ static Map<Path, Runnable> getPrefetchRunnable() {
+ return prefetchRunnable;
+ }
+
+ static boolean isPrefetchStarted() {
+ AtomicBoolean prefetchStarted = new AtomicBoolean(false);
+ for (Map.Entry<Path, Future<?>> entry : prefetchFutures.entrySet()) {
+ Path k = entry.getKey();
+ Future<?> v = entry.getValue();
+ ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
+ long waitTime = sf.getDelay(TimeUnit.MILLISECONDS);
+ LOG.info("Remaining wait time is : {}", waitTime);
+ if (waitTime < 0) {
+ //At this point prefetch is started
+ prefetchStarted.set(true);
+ break;
+ }
+ }
+ return prefetchStarted.get();
+ }
+
+ public static int getPrefetchDelay() {
+ return prefetchDelayMillis;
+ }
+
+ public static void loadConfiguration(Configuration conf) {
+ prefetchDelayMillis = conf.getInt(PREFETCH_DELAY, 1000);
+ prefetchFutures.forEach((k, v) -> {
+ // Do not cancel the task which is about to complete
+ ScheduledFuture sf = (ScheduledFuture) prefetchFutures.get(k);
+ if (sf.getDelay(TimeUnit.MILLISECONDS) > prefetchDelayMillis) {
Review Comment:
Should test if it's > 0, no? That means the thread is still pending delay
expiration and has not started to run yet, so can be re-scheduled at no cost.
##########
hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestPrefetch.java:
##########
@@ -298,6 +299,44 @@ public void testPrefetchDoesntSkipRefs() throws Exception {
});
}
+ @Test
+ public void testOnConfigurationChange() {
+ // change PREFETCH_DELAY_ENABLE_KEY from false to true
+ conf.setInt(PREFETCH_DELAY, 40000);
+ PrefetchExecutor.loadConfiguration(conf);
+ assertTrue(getPrefetchDelay() == 40000);
+
+ // restore
+ conf.setInt(PREFETCH_DELAY, 30000);
+ PrefetchExecutor.loadConfiguration(conf);
+ assertTrue(getPrefetchDelay() == 30000);
+
+ }
+
+ @Test
+ public void testPrefetchWithDelay() throws Exception {
Review Comment:
You know your delay, and you know that no threads should be running before
this delay. You can easily get the scheduled threads from PrefetchExecutor via
prefetchFutures/prefetchRunnables and use it to test the state of the thread
prior and after the delay has elapsed. That way you can test this without the
need to expose a non-thread safe long with the "computed" delay for test
purposes only (an anti-pattern).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]