Github user revans2 commented on the pull request:
https://github.com/apache/storm/pull/870#issuecomment-155475891
I did some micro-benchmark experiments.
```
public class Test extends Thread {
final long _expectedEnd;
final long _sleepTime;
public Test(long ee, long st) {
_expectedEnd = ee;
_sleepTime = st;
}
public void run() {
try {
while (System.currentTimeMillis() < _expectedEnd) {
Thread.sleep(_sleepTime);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static void main(String [] args) throws Exception {
long sleepTime = 1;
if (args.length > 0) {
sleepTime = Long.valueOf(args[0]);
}
long totalTimeSec = 100;
if (args.length > 1) {
totalTimeSec = Long.valueOf(args[1]);
}
int totalThreads = 10;
if (args.length > 2) {
totalThreads = Integer.valueOf(args[2]);
}
long totalTimeMs = totalTimeSec * 1000;
long expectedEnd = System.currentTimeMillis() + totalTimeMs;
int ret = -1;
try {
Test [] tests = new Test[totalThreads];
for (int i = 0; i < totalThreads; i++) {
tests[i] = new Test(expectedEnd, sleepTime);
tests[i].start();
}
for (int i = 0; i < totalThreads; i++) {
tests[i].join();
}
ret = 0;
} finally {
System.exit(ret);
}
}
}
```
can make my laptop become unresponsive at 300 threads and a 1 ms sleep time.
```
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.Random;
public class Test extends Thread {
public static final ScheduledThreadPoolExecutor _timer = new
ScheduledThreadPoolExecutor(0);
public static final AtomicLong _error = new AtomicLong(0);
private long _expected;
private final long _sleepTime;
private final Random _rand = new Random();
public Test(long sleepTime) {
_sleepTime = sleepTime;
_expected = System.currentTimeMillis() + sleepTime;
}
public void run() {
long now = System.currentTimeMillis();
long err = Math.abs(now - _expected);
_error.addAndGet(err);
if (_rand.nextInt(100) >= 99) {
try {
Thread.sleep(10);
} catch (Exception e) {
//ignored
}
}
_expected = System.currentTimeMillis() + _sleepTime;
}
public static void main(String [] args) throws Exception {
long sleepTime = 1;
if (args.length > 0) {
sleepTime = Long.valueOf(args[0]);
}
long totalTimeSec = 100;
if (args.length > 1) {
totalTimeSec = Long.valueOf(args[1]);
}
int totalThreads = 10;
if (args.length > 2) {
totalThreads = Integer.valueOf(args[2]);
}
long totalTimeMs = totalTimeSec * 1000;
int ret = -1;
try {
ScheduledFuture<?> [] tests = new ScheduledFuture<?>[totalThreads];
for (int i = 0; i < totalThreads; i++) {
Test t = new Test(sleepTime);
//TODO real one will need locking
_timer.setCorePoolSize(_timer.getCorePoolSize() + 1);
tests[i] = _timer.scheduleWithFixedDelay(t, sleepTime, sleepTime,
TimeUnit.MILLISECONDS);
}
Thread.sleep(totalTimeMs);
for (int i = 0; i < totalThreads; i++) {
tests[i].cancel(true);
}
System.out.println("AVG Error
"+(((double)_error.get())/totalThreads/totalTimeSec));
ret = 0;
} finally {
System.exit(ret);
}
}
}
```
uses almost no CPU and does more or less the same thing, but with more
jitter, and a measurement of how much error there is. I tried various
combinations of thins to improve the performance. Now I think we need to write
a custom SchedulerExecutorService, that can increase and decrease the number of
threads based off of how much load there is. It can purposely run multiple
tasks together, even if they would be spread out with a more accurate clock
waking them up.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---