This is an automated email from the ASF dual-hosted git repository. adriancole pushed a commit to branch error-io-on-assembly in repository https://gitbox.apache.org/repos/asf/incubator-zipkin.git
commit 8300976933d3f6700c7dec8bf5f82754a5699e56 Author: Adrian Cole <[email protected]> AuthorDate: Fri May 10 17:57:51 2019 +0800 Makes it an error to store during assembly of a call Before this, there was some extra code in the throttle package handling a bug in our in memory storage. This fixes that and removes the extra code. See #2502 --- .../server/internal/throttle/ThrottledCall.java | 59 +++++------------- .../throttle/ThrottledStorageComponent.java | 4 +- .../server/internal/throttle/ThrottledCallTest.kt | 72 ++++++++-------------- .../src/main/java/zipkin2/storage/ITSpanStore.java | 56 +++++++++++++++++ .../main/java/zipkin2/storage/InMemoryStorage.java | 39 +++++++++++- .../java/zipkin2/storage/InMemoryStorageTest.java | 14 ++--- 6 files changed, 142 insertions(+), 102 deletions(-) diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java index f43d61e..67aa71d 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledCall.java @@ -24,10 +24,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; -import java.util.function.Supplier; import zipkin2.Call; import zipkin2.Callback; -import zipkin2.storage.InMemoryStorage; /** * {@link Call} implementation that is backed by an {@link ExecutorService}. The ExecutorService @@ -41,39 +39,21 @@ import zipkin2.storage.InMemoryStorage; * * @see ThrottledStorageComponent */ -final class ThrottledCall<V> extends Call<V> { +final class ThrottledCall<V> extends Call.Base<V> { final ExecutorService executor; final Limiter<Void> limiter; - final Listener limitListener; - /** - * supplier call needs to be supplied later to avoid having it take action when it is created - * (like {@link InMemoryStorage} and thus avoid being throttled. - */ - final Supplier<? extends Call<V>> supplier; - volatile Call<V> delegate; - volatile boolean canceled; - - public ThrottledCall(ExecutorService executor, Limiter<Void> limiter, - Supplier<? extends Call<V>> supplier) { + final Call<V> delegate; + + ThrottledCall(ExecutorService executor, Limiter<Void> limiter, Call<V> delegate) { this.executor = executor; this.limiter = limiter; - this.limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new); - this.supplier = supplier; + this.delegate = delegate; } - // TODO: refactor this when in-memory no longer executes storage ops during assembly time - ThrottledCall(ThrottledCall<V> other) { - this(other.executor, other.limiter, - other.delegate == null ? other.supplier : () -> other.delegate.clone()); - } + @Override protected V doExecute() throws IOException { + Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new); - // TODO: we cannot currently extend Call.Base as tests execute the call multiple times, - // which is invalid as calls are one-shot. It isn't worth refactoring until we refactor out - // the need for assembly time throttling (fix to in-memory storage) - @Override public V execute() throws IOException { try { - delegate = supplier.get(); - // Make sure we throttle Future<V> future = executor.submit(() -> { String oldName = setCurrentThreadName(delegate.toString()); @@ -115,9 +95,11 @@ final class ThrottledCall<V> extends Call<V> { } } - @Override public void enqueue(Callback<V> callback) { + @Override protected void doEnqueue(Callback<V> callback) { + Listener limitListener = limiter.acquire(null).orElseThrow(RejectedExecutionException::new); + try { - executor.execute(new QueuedCall(callback)); + executor.execute(new QueuedCall(callback, limitListener)); } catch (RuntimeException | Error e) { propagateIfFatal(e); // Ignoring in all cases here because storage itself isn't saying we need to throttle. Though, we may still be @@ -127,21 +109,12 @@ final class ThrottledCall<V> extends Call<V> { } } - @Override public void cancel() { - canceled = true; - if (delegate != null) delegate.cancel(); - } - - @Override public boolean isCanceled() { - return canceled || (delegate != null && delegate.isCanceled()); - } - @Override public Call<V> clone() { - return new ThrottledCall<>(this); + return new ThrottledCall<>(executor, limiter, delegate.clone()); } @Override public String toString() { - return "Throttled" + supplier; + return "Throttled" + delegate; } static String setCurrentThreadName(String name) { @@ -153,17 +126,17 @@ final class ThrottledCall<V> extends Call<V> { final class QueuedCall implements Runnable { final Callback<V> callback; + final Listener limitListener; - QueuedCall(Callback<V> callback) { + QueuedCall(Callback<V> callback, Listener limitListener) { this.callback = callback; + this.limitListener = limitListener; } @Override public void run() { try { if (isCanceled()) return; - delegate = ThrottledCall.this.supplier.get(); - String oldName = setCurrentThreadName(delegate.toString()); try { enqueueAndWait(); diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java index 91e7b78..1422232 100644 --- a/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java +++ b/zipkin-server/src/main/java/zipkin2/server/internal/throttle/ThrottledStorageComponent.java @@ -104,7 +104,7 @@ public final class ThrottledStorageComponent extends StorageComponent { return "Throttled" + delegate; } - final class ThrottledSpanConsumer implements SpanConsumer { + static final class ThrottledSpanConsumer implements SpanConsumer { final SpanConsumer delegate; final Limiter<Void> limiter; final ExecutorService executor; @@ -116,7 +116,7 @@ public final class ThrottledStorageComponent extends StorageComponent { } @Override public Call<Void> accept(List<Span> spans) { - return new ThrottledCall<>(executor, limiter, () -> delegate.accept(spans)); + return new ThrottledCall<>(executor, limiter, delegate.accept(spans)); } @Override public String toString() { diff --git a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt index 00eb02f..b729e9a 100644 --- a/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt +++ b/zipkin-server/src/test/kotlin/zipkin2/server/internal/throttle/ThrottledCallTest.kt @@ -21,6 +21,7 @@ import com.netflix.concurrency.limits.Limiter.Listener import com.netflix.concurrency.limits.limit.SettableLimit import com.netflix.concurrency.limits.limiter.SimpleLimiter import org.assertj.core.api.Assertions.assertThat +import org.junit.After import org.junit.Test import org.mockito.ArgumentMatchers.any import org.mockito.Mockito @@ -41,39 +42,34 @@ import java.util.concurrent.RejectedExecutionException import java.util.concurrent.Semaphore import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit -import java.util.function.Supplier -// TODO: this class re-uses Call objects which is bad as they are one-shot. This needs to be -// refactored in order to be realistic (calls throw if re-invoked, as clone() is the correct way) class ThrottledCallTest { - var limit = SettableLimit.startingAt(0) - var limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>() + val limit = SettableLimit.startingAt(0) + val limiter = SimpleLimiter.newBuilder().limit(limit).build<Void>() - inline fun <reified T : Any> mock() = Mockito.mock(T::class.java) + val numThreads = 1 + val executor = Executors.newSingleThreadExecutor(); + @After fun shutdownExecutor() = executor.shutdown() - @Test fun callCreation_isDeferred() { - val created = booleanArrayOf(false) + inline fun <reified T : Any> mock() = Mockito.mock(T::class.java) - val throttle = createThrottle(Supplier { - created[0] = true - Call.create<Void>(null) - }) + @Test fun niceToString() { + val delegate: Call<Void> = mock() + `when`(delegate.toString()).thenReturn("StoreSpansCall{}") - assertThat(created).contains(false) - throttle.execute() - assertThat(created).contains(true) + assertThat(ThrottledCall(executor, limiter, delegate)) + .hasToString("ThrottledStoreSpansCall{}") } @Test fun execute_isThrottled() { - val numThreads = 1 val queueSize = 1 val totalTasks = numThreads + queueSize + limit.limit = totalTasks val startLock = Semaphore(numThreads) val waitLock = Semaphore(totalTasks) val failLock = Semaphore(1) - val throttle = - createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) }) + val throttled = throttle(LockedCall(startLock, waitLock)) // Step 1: drain appropriate locks startLock.drainPermits() @@ -83,7 +79,7 @@ class ThrottledCallTest { // Step 2: saturate threads and fill queue val backgroundPool = Executors.newCachedThreadPool() for (i in 0 until totalTasks) { - backgroundPool.submit(Callable { throttle.execute() }) + backgroundPool.submit(Callable { throttled.clone().execute() }) } try { @@ -93,7 +89,7 @@ class ThrottledCallTest { // Step 4: submit something beyond our limits val future = backgroundPool.submit(Callable { try { - throttle.execute() + throttled.execute() } catch (e: IOException) { throw RuntimeException(e) } finally { @@ -125,7 +121,7 @@ class ThrottledCallTest { val call = FakeCall() call.overCapacity = true - val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call }) + val throttle = ThrottledCall(executor, mockLimiter(listener), call) try { throttle.execute() assertThat(true).isFalse() // should raise a RejectedExecutionException @@ -137,8 +133,7 @@ class ThrottledCallTest { @Test fun execute_ignoresLimit_whenPoolFull() { val listener: Listener = mock() - val throttle = - ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() }) + val throttle = ThrottledCall(mockExhaustedPool(), mockLimiter(listener), FakeCall()) try { throttle.execute() assertThat(true).isFalse() // should raise a RejectedExecutionException @@ -148,14 +143,13 @@ class ThrottledCallTest { } @Test fun enqueue_isThrottled() { - val numThreads = 1 val queueSize = 1 val totalTasks = numThreads + queueSize + limit.limit = totalTasks val startLock = Semaphore(numThreads) val waitLock = Semaphore(totalTasks) - val throttle = - createThrottle(numThreads, queueSize, Supplier { LockedCall(startLock, waitLock) }) + val throttle = throttle(LockedCall(startLock, waitLock)) // Step 1: drain appropriate locks startLock.drainPermits() @@ -164,7 +158,7 @@ class ThrottledCallTest { // Step 2: saturate threads and fill queue val callback: Callback<Void> = mock() for (i in 0 until totalTasks) { - throttle.enqueue(callback) + throttle.clone().enqueue(callback) } // Step 3: make sure the threads actually started @@ -172,7 +166,7 @@ class ThrottledCallTest { try { // Step 4: submit something beyond our limits and make sure it fails - throttle.enqueue(callback) + throttle.clone().enqueue(callback) assertThat(true).isFalse() // should raise a RejectedExecutionException } catch (e: RejectedExecutionException) { @@ -187,7 +181,7 @@ class ThrottledCallTest { val call = FakeCall() call.overCapacity = true - val throttle = ThrottledCall(createPool(1, 1), mockLimiter(listener), Supplier { call }) + val throttle = ThrottledCall(executor, mockLimiter(listener), call) val latch = CountDownLatch(1) throttle.enqueue(object : Callback<Void> { override fun onSuccess(value: Void) { @@ -207,7 +201,7 @@ class ThrottledCallTest { val listener: Listener = mock() val throttle = - ThrottledCall(mockExhaustedPool(), mockLimiter(listener), Supplier { FakeCall() }) + ThrottledCall(mockExhaustedPool(), mockLimiter(listener), FakeCall()) try { throttle.enqueue(null) assertThat(true).isFalse() // should raise a RejectedExecutionException @@ -216,18 +210,7 @@ class ThrottledCallTest { } } - private fun createThrottle(delegate: Supplier<Call<Void>>): ThrottledCall<Void> { - return createThrottle(1, 1, delegate) - } - - private fun createThrottle( - poolSize: Int, - queueSize: Int, - delegate: Supplier<Call<Void>> - ): ThrottledCall<Void> { - limit.setLimit(limit.getLimit() + 1) - return ThrottledCall(createPool(poolSize, queueSize), limiter, delegate) - } + private fun throttle(delegate: Call<Void>) = ThrottledCall(executor, limiter, delegate) private class LockedCall(val startLock: Semaphore, val waitLock: Semaphore) : Call.Base<Void>() { override fun doExecute(): Void? { @@ -252,11 +235,6 @@ class ThrottledCallTest { override fun clone() = LockedCall(startLock, waitLock); } - private fun createPool(poolSize: Int, queueSize: Int): ExecutorService { - return ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.DAYS, - LinkedBlockingQueue(queueSize)) - } - private fun mockExhaustedPool(): ExecutorService { val mock: ExecutorService = mock() doThrow(RejectedExecutionException::class.java).`when`(mock).execute(any()) diff --git a/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java b/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java index edac7a5..a883065 100644 --- a/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java +++ b/zipkin-tests/src/main/java/zipkin2/storage/ITSpanStore.java @@ -23,17 +23,21 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import org.junit.Before; import org.junit.Test; +import zipkin2.Call; +import zipkin2.Callback; import zipkin2.Endpoint; import zipkin2.Span; import zipkin2.internal.Trace; import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown; import static zipkin2.TestObjects.BACKEND; import static zipkin2.TestObjects.CLIENT_SPAN; import static zipkin2.TestObjects.DAY; @@ -106,6 +110,58 @@ public abstract class ITSpanStore { allShouldWorkWhenEmpty(); } + @Test public void consumer_properlyImplementsCallContract_execute() throws IOException { + Call<Void> call = storage().spanConsumer().accept(asList(LOTS_OF_SPANS[0])); + + // Ensure the implementation didn't accidentally do I/O at assembly time. + assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()).isEmpty(); + call.execute(); + + assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()) + .containsExactly(LOTS_OF_SPANS[0]); + + try { + call.execute(); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalStateException e) { + } + + // no problem to clone a call + call.clone().execute(); + } + + @Test public void consumer_properlyImplementsCallContract_submit() throws Exception { + Call<Void> call = storage().spanConsumer().accept(asList(LOTS_OF_SPANS[0])); + // Ensure the implementation didn't accidentally do I/O at assembly time. + assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()).isEmpty(); + + CountDownLatch latch = new CountDownLatch(1); + Callback<Void> callback = new Callback<Void>() { + @Override public void onSuccess(Void value) { + latch.countDown(); + } + + @Override public void onError(Throwable t) { + latch.countDown(); + } + }; + + call.enqueue(callback); + latch.await(); + + assertThat(store().getTrace(LOTS_OF_SPANS[0].traceId()).execute()) + .containsExactly(LOTS_OF_SPANS[0]); + + try { + call.enqueue(callback); + failBecauseExceptionWasNotThrown(IllegalArgumentException.class); + } catch (IllegalStateException e) { + } + + // no problem to clone a call + call.clone().execute(); + } + /** * Ideally, storage backends can deduplicate identical documents as this will prevent some * analysis problems such as double-counting dependency links or other statistics. While this test diff --git a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java index ce72238..88d2195 100644 --- a/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java +++ b/zipkin/src/main/java/zipkin2/storage/InMemoryStorage.java @@ -30,6 +30,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import zipkin2.Call; +import zipkin2.Callback; import zipkin2.DependencyLink; import zipkin2.Endpoint; import zipkin2.Span; @@ -189,8 +190,11 @@ public final class InMemoryStorage extends StorageComponent implements SpanStore autocompleteTags.clear(); } - @Override - public synchronized Call<Void> accept(List<Span> spans) { + @Override public Call<Void> accept(List<Span> spans) { + return new StoreSpansCall(spans); + } + + synchronized void doAccept(List<Span> spans) { int delta = spans.size(); int spansToRecover = (spansByTraceIdTimeStamp.size() + delta) - maxSpanCount; evictToRecoverSpans(spansToRecover); @@ -221,7 +225,36 @@ public final class InMemoryStorage extends StorageComponent implements SpanStore } } } - return Call.create(null /* Void == null */); + } + + final class StoreSpansCall extends Call.Base<Void> { + final List<Span> spans; + + StoreSpansCall(List<Span> spans) { + this.spans = spans; + } + + @Override protected Void doExecute() { + doAccept(spans); + return null; + } + + @Override protected void doEnqueue(Callback<Void> callback) { + try { + callback.onSuccess(doExecute()); + } catch (RuntimeException | Error e) { + Call.propagateIfFatal(e); + callback.onError(e); + } + } + + @Override public Call<Void> clone() { + return new StoreSpansCall(spans); + } + + @Override public String toString() { + return "StoreSpansCall{" + spans + "}"; + } } /** Returns the count of spans evicted. */ diff --git a/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java b/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java index 24f617f..7ed0d61 100644 --- a/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java +++ b/zipkin/src/test/java/zipkin2/storage/InMemoryStorageTest.java @@ -83,9 +83,9 @@ public class InMemoryStorageTest { } /** Ensures we don't overload a partition due to key equality being conflated with order */ - @Test public void differentiatesOnTraceIdWhenTimestampEqual() { - storage.accept(asList(CLIENT_SPAN)); - storage.accept(asList(CLIENT_SPAN.toBuilder().traceId("333").build())); + @Test public void differentiatesOnTraceIdWhenTimestampEqual() throws IOException { + storage.accept(asList(CLIENT_SPAN)).execute(); + storage.accept(asList(CLIENT_SPAN.toBuilder().traceId("333").build())).execute(); assertThat(storage).extracting("spansByTraceIdTimeStamp.delegate") .allSatisfy(map -> assertThat((Map) map).hasSize(2)); @@ -100,8 +100,8 @@ public class InMemoryStorageTest { .timestamp(TODAY * 1000) .build(); - storage.accept(asList(span)); - storage.accept(asList(span)); + storage.accept(asList(span)).execute(); + storage.accept(asList(span)).execute(); assertThat(storage.getDependencies(TODAY + 1000L, TODAY).execute()).containsOnly( DependencyLink.newBuilder().parent("kafka").child("app").callCount(1L).build() @@ -119,7 +119,7 @@ public class InMemoryStorageTest { .timestamp(TODAY * 1000) .build(); - storage.accept(asList(span1, span2)); + storage.accept(asList(span1, span2)).execute(); assertThat(storage.getSpanNames("app").execute()).containsOnly( "root" @@ -153,7 +153,7 @@ public class InMemoryStorageTest { .putTag("http.path", "/users") .timestamp(TODAY * 1000) .build(); - storage.accept(asList(span1, span2, span3, span4)); + storage.accept(asList(span1, span2, span3, span4)).execute(); assertThat(storage.getKeys().execute()).containsOnlyOnce("http.path"); assertThat(storage.getValues("http.path").execute()).containsOnlyOnce("/users");
