Repository: incubator-htrace Updated Branches: refs/heads/master 878e4e0a9 -> 8eca1757e
HTRACE-358. Provide convenience wrapper around ScheduledExecutorService (Mike Drob via Colin P. McCabe) Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/8eca1757 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/8eca1757 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/8eca1757 Branch: refs/heads/master Commit: 8eca1757eecbeb78b12794d0ec98a605b6df7b1b Parents: 878e4e0 Author: Colin P. McCabe <[email protected]> Authored: Fri Aug 26 11:35:25 2016 -0700 Committer: Colin P. McCabe <[email protected]> Committed: Fri Aug 26 11:35:25 2016 -0700 ---------------------------------------------------------------------- .../core/ScheduledTraceExecutorService.java | 66 ++++++++++++++++++++ .../htrace/core/TraceExecutorService.java | 28 +++++++-- .../java/org/apache/htrace/core/Tracer.java | 11 ++++ .../apache/htrace/core/TestTraceExecutor.java | 66 ++++++++++++++++++++ 4 files changed, 166 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/8eca1757/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java b/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java new file mode 100644 index 0000000..e783561 --- /dev/null +++ b/htrace-core4/src/main/java/org/apache/htrace/core/ScheduledTraceExecutorService.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.htrace.core; + +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * A convenience wrapper around a {@link ScheduledExecutorService} for + * automatically propagating trace scopes to executable tasks. + * <p> + * Recurring tasks will use independent scopes per execution, but will all be + * tied to the same parent scope (if any). + */ +public class ScheduledTraceExecutorService extends TraceExecutorService + implements ScheduledExecutorService { + final ScheduledExecutorService impl; + + ScheduledTraceExecutorService(Tracer tracer, String scopeName, + ScheduledExecutorService impl) { + super(tracer, scopeName, impl); + this.impl = impl; + } + + @Override + public ScheduledFuture<?> schedule(Runnable command, long delay, + TimeUnit unit) { + return impl.schedule(wrap(command), delay, unit); + } + + @Override + public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, + TimeUnit unit) { + return impl.schedule(wrap(callable), delay, unit); + } + + @Override + public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, + long initialDelay, long period, TimeUnit unit) { + return impl.scheduleAtFixedRate(wrap(command), initialDelay, period, unit); + } + + @Override + public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, + long initialDelay, long delay, TimeUnit unit) { + return impl.scheduleWithFixedDelay(wrap(command), initialDelay, delay, + unit); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/8eca1757/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java b/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java index 81e31ea..592f354 100644 --- a/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java +++ b/htrace-core4/src/main/java/org/apache/htrace/core/TraceExecutorService.java @@ -26,6 +26,10 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +/** + * A convenience wrapper around an {@link ExecutorService} for automatically + * propagating trace scopes to executable tasks. + */ public class TraceExecutorService implements ExecutorService { private final Tracer tracer; private final String scopeName; @@ -40,7 +44,7 @@ public class TraceExecutorService implements ExecutorService { @Override public void execute(Runnable command) { - impl.execute(tracer.wrap(command, scopeName)); + impl.execute(wrap(command)); } @Override @@ -71,24 +75,38 @@ public class TraceExecutorService implements ExecutorService { @Override public <T> Future<T> submit(Callable<T> task) { - return impl.submit(tracer.wrap(task, scopeName)); + return impl.submit(wrap(task)); } @Override public <T> Future<T> submit(Runnable task, T result) { - return impl.submit(tracer.wrap(task, scopeName), result); + return impl.submit(wrap(task), result); } @Override public Future<?> submit(Runnable task) { - return impl.submit(tracer.wrap(task, scopeName)); + return impl.submit(wrap(task)); + } + + /* + * Intended for internal use only. + */ + Runnable wrap(Runnable runnable) { + return tracer.wrap(runnable, scopeName); + } + + /* + * Intended for internal use only. + */ + <V> Callable<V> wrap(Callable<V> callable) { + return tracer.wrap(callable, scopeName); } private <T> Collection<? extends Callable<T>> wrapCollection( Collection<? extends Callable<T>> tasks) { List<Callable<T>> result = new ArrayList<Callable<T>>(); for (Callable<T> task : tasks) { - result.add(tracer.wrap(task, scopeName)); + result.add(wrap(task)); } return result; } http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/8eca1757/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java b/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java index f78e0a0..0ca4d1d 100644 --- a/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java +++ b/htrace-core4/src/main/java/org/apache/htrace/core/Tracer.java @@ -25,6 +25,7 @@ import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -474,6 +475,16 @@ public class Tracer implements Closeable { return new TraceExecutorService(this, scopeName, impl); } + public ScheduledTraceExecutorService newTraceExecutorService( + ScheduledExecutorService impl) { + return newTraceExecutorService(impl, null); + } + + public ScheduledTraceExecutorService newTraceExecutorService( + ScheduledExecutorService impl, String scopeName) { + return new ScheduledTraceExecutorService(this, scopeName, impl); + } + public TracerPool getTracerPool() { if (tracerPool == null) { throwClientError(toString() + " is closed."); http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/8eca1757/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java ---------------------------------------------------------------------- diff --git a/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java b/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java index dbdd27c..1bd18f7 100644 --- a/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java +++ b/htrace-core4/src/test/java/org/apache/htrace/core/TestTraceExecutor.java @@ -17,15 +17,23 @@ package org.apache.htrace.core; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.apache.htrace.core.Tracer.Builder; import org.junit.Test; public class TestTraceExecutor { @@ -88,6 +96,64 @@ public class TestTraceExecutor { } } + @Test + public void testScheduledExecutor() throws Exception { + final int TASK_COUNT = 3; + final int DELAY = 500; + + HTraceConfiguration conf = HTraceConfiguration.fromKeyValuePairs( + Tracer.SAMPLER_CLASSES_KEY, AlwaysSampler.class.getName()); + + ScheduledExecutorService ses = null; + Builder builder = new Tracer.Builder("TestTraceExecutor").conf(conf); + try (Tracer tracer = builder.build()) { + final ThreadFactory tf = new NamingThreadFactory(); + ses = Executors.newScheduledThreadPool(TASK_COUNT, tf); + ses = tracer.newTraceExecutorService(ses); + + final CountDownLatch startLatch = new CountDownLatch(TASK_COUNT); + final CountDownLatch continueLatch = new CountDownLatch(1); + Callable<String> task = new Callable<String>() { + @Override + public String call() throws InterruptedException { + startLatch.countDown(); + // Prevent any task from exiting until every task has started + assertTrue(continueLatch.await(WAIT_TIME_SECONDS, TimeUnit.SECONDS)); + // Annotate on the presumed child trace + Tracer.getCurrentSpan().addTimelineAnnotation( + Thread.currentThread().getName()); + return Tracer.getCurrentSpan().getDescription(); + } + }; + + try (TraceScope scope = tracer.newScope("TestRunnable")) { + Collection<Future<String>> futures = new ArrayList<>(); + + for (int i = 0; i < TASK_COUNT; i++) { + futures.add(ses.schedule(task, DELAY, TimeUnit.MILLISECONDS)); + } + + // Wait for all tasks to start + assertTrue(startLatch.await(WAIT_TIME_SECONDS, TimeUnit.SECONDS)); + continueLatch.countDown(); + // Collect the expected results + Collection<String> results = new HashSet<>(); + for (Future<String> future : futures) { + results.add(future.get(WAIT_TIME_SECONDS, TimeUnit.SECONDS)); + } + + assertTrue("Timeline Annotations should have gone to child traces.", + Tracer.getCurrentSpan().getTimelineAnnotations().isEmpty()); + assertEquals("Duplicated child span descriptions.", TASK_COUNT, + results.size()); + } + } finally { + if (ses != null) { + ses.shutdown(); + } + } + } + /* * Inspired by org.apache.solr.util.DefaultSolrThreadFactory */
