This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 4dc5942daba BaseWorkerClientImpl: Don't attempt to recover from a
closed channel. (#17052)
4dc5942daba is described below
commit 4dc5942dabacfc9e458f9c742e510cb94c091c8d
Author: Gian Merlino <[email protected]>
AuthorDate: Sun Sep 15 02:10:58 2024 -0700
BaseWorkerClientImpl: Don't attempt to recover from a closed channel.
(#17052)
* BaseWorkerClientImpl: Don't attempt to recover from a closed channel.
This patch introduces an exception type "ChannelClosedForWritesException",
which allows the BaseWorkerClientImpl to avoid retrying when the local
channel has been closed. This can happen in cases of cancellation.
* Add some test coverage.
* wip
* Add test coverage.
* Style.
---
.../apache/druid/msq/rpc/BaseWorkerClientImpl.java | 19 +-
.../druid/msq/rpc/BaseWorkerClientImplTest.java | 383 +++++++++++++++++++++
.../channel/ChannelClosedForWritesException.java | 33 ++
.../channel/ReadableByteChunksFrameChannel.java | 4 +-
.../ReadableByteChunksFrameChannelTest.java | 13 +
.../org/apache/druid/rpc/MockServiceClient.java | 4 +-
6 files changed, 448 insertions(+), 8 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/BaseWorkerClientImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/BaseWorkerClientImpl.java
index fd1a0323d0f..d6e7d412aca 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/BaseWorkerClientImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/rpc/BaseWorkerClientImpl.java
@@ -26,6 +26,7 @@ import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.frame.channel.ChannelClosedForWritesException;
import org.apache.druid.frame.channel.ReadableByteChunksFrameChannel;
import org.apache.druid.frame.file.FrameFileHttpResponseHandler;
import org.apache.druid.frame.file.FrameFilePartialFetch;
@@ -219,12 +220,18 @@ public abstract class BaseWorkerClientImpl implements
WorkerClient
public void onSuccess(FrameFilePartialFetch partialFetch)
{
if (partialFetch.isExceptionCaught()) {
- // Exception while reading channel. Recoverable.
- log.noStackTrace().info(
- partialFetch.getExceptionCaught(),
- "Encountered exception while reading channel [%s]",
- channel.getId()
- );
+ if (partialFetch.getExceptionCaught() instanceof
ChannelClosedForWritesException) {
+ // Channel was closed. Stop trying.
+ retVal.setException(partialFetch.getExceptionCaught());
+ return;
+ } else {
+ // Exception while reading channel. Recoverable.
+ log.noStackTrace().warn(
+ partialFetch.getExceptionCaught(),
+ "Attempting recovery after exception while reading
channel[%s]",
+ channel.getId()
+ );
+ }
}
// Empty fetch means this is the last fetch for the channel.
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/rpc/BaseWorkerClientImplTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/rpc/BaseWorkerClientImplTest.java
new file mode 100644
index 00000000000..dd8633c886f
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/rpc/BaseWorkerClientImplTest.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.rpc;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import it.unimi.dsi.fastutil.bytes.ByteArrays;
+import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.channel.ByteTracker;
+import org.apache.druid.frame.channel.ChannelClosedForWritesException;
+import org.apache.druid.frame.channel.ReadableByteChunksFrameChannel;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.file.FrameFile;
+import org.apache.druid.frame.file.FrameFileHttpResponseHandler;
+import org.apache.druid.frame.file.FrameFileWriter;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.testutil.FrameSequenceBuilder;
+import org.apache.druid.frame.testutil.FrameTestUtil;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.guava.Sequences;
+import org.apache.druid.msq.exec.WorkerClient;
+import org.apache.druid.msq.kernel.StageId;
+import org.apache.druid.rpc.MockServiceClient;
+import org.apache.druid.rpc.RequestBuilder;
+import org.apache.druid.rpc.ServiceClient;
+import org.apache.druid.segment.QueryableIndexCursorFactory;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.druid.utils.CloseableUtils;
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.MatcherAssert;
+import org.jboss.netty.handler.codec.http.HttpMethod;
+import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.internal.matchers.ThrowableMessageMatcher;
+
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class BaseWorkerClientImplTest extends InitializedNullHandlingTest
+{
+ private static final String WORKER_ID = "w0";
+ /**
+ * Bytes for a {@link FrameFile} with no frames. (Not an empty array.)
+ */
+ private static byte[] NIL_FILE_BYTES;
+ /**
+ * Bytes for a {@link FrameFile} holding {@link
TestIndex#getMMappedTestIndex()}.
+ */
+ private static byte[] FILE_BYTES;
+ private static FrameReader FRAME_READER;
+
+ private ObjectMapper jsonMapper;
+ private MockServiceClient workerServiceClient;
+ private WorkerClient workerClient;
+ private ExecutorService exec;
+
+ @BeforeClass
+ public static void setupClass()
+ {
+ final QueryableIndexCursorFactory cursorFactory = new
QueryableIndexCursorFactory(TestIndex.getMMappedTestIndex());
+
+ NIL_FILE_BYTES = toFileBytes(Sequences.empty());
+ FILE_BYTES = toFileBytes(
+ FrameSequenceBuilder.fromCursorFactory(cursorFactory)
+ .frameType(FrameType.COLUMNAR)
+ .maxRowsPerFrame(10)
+ .frames()
+ );
+ FRAME_READER = FrameReader.create(cursorFactory.getRowSignature());
+ }
+
+ @AfterClass
+ public static void afterClass()
+ {
+ NIL_FILE_BYTES = null;
+ FILE_BYTES = null;
+ FRAME_READER = null;
+ }
+
+ @Before
+ public void setup()
+ {
+ jsonMapper = new DefaultObjectMapper();
+ workerServiceClient = new MockServiceClient();
+ workerClient = new TestWorkerClient(jsonMapper, workerServiceClient);
+ exec = Execs.singleThreaded(StringUtils.encodeForFormat("exec-for-" +
getClass().getName()) + "-%s");
+ }
+
+ @After
+ public void tearDown() throws InterruptedException
+ {
+ workerServiceClient.verify();
+ exec.shutdownNow();
+ if (!exec.awaitTermination(1, TimeUnit.MINUTES)) {
+ throw new ISE("Timed out waiting for exec to finish");
+ }
+ }
+
+ @Test
+ public void test_fetchChannelData_empty() throws Exception
+ {
+ workerServiceClient.expectAndRespond(
+ new RequestBuilder(HttpMethod.GET, "/channels/xyz/1/2?offset=0")
+ .header(HttpHeaders.ACCEPT_ENCODING, "identity"),
+ HttpResponseStatus.OK,
+ fetchChannelDataResponseHeaders(false),
+ NIL_FILE_BYTES
+ ).expectAndRespond(
+ new RequestBuilder(HttpMethod.GET, "/channels/xyz/1/2?offset=" +
NIL_FILE_BYTES.length)
+ .header(HttpHeaders.ACCEPT_ENCODING, "identity"),
+ HttpResponseStatus.OK,
+ fetchChannelDataResponseHeaders(true),
+ ByteArrays.EMPTY_ARRAY
+ );
+
+ // Perform the test.
+ final StageId stageId = new StageId("xyz", 1);
+ final ReadableByteChunksFrameChannel channel =
ReadableByteChunksFrameChannel.create("testChannel", false);
+ final Future<List<List<Object>>> framesFuture = readChannelAsync(channel);
+
+ Assert.assertFalse(workerClient.fetchChannelData(WORKER_ID, stageId, 2, 0,
channel).get());
+ Assert.assertTrue(workerClient.fetchChannelData(WORKER_ID, stageId, 2,
NIL_FILE_BYTES.length, channel).get());
+ channel.doneWriting(); // Caller is expected to call doneWriting after
fetchChannelData returns true.
+
+ Assert.assertEquals(
+ 0,
+ framesFuture.get().size()
+ );
+ }
+
+ @Test
+ public void test_fetchChannelData_empty_intoClosedChannel()
+ {
+ workerServiceClient.expectAndRespond(
+ new RequestBuilder(HttpMethod.GET, "/channels/xyz/1/2?offset=0")
+ .header(HttpHeaders.ACCEPT_ENCODING, "identity"),
+ HttpResponseStatus.OK,
+ fetchChannelDataResponseHeaders(false),
+ NIL_FILE_BYTES
+ );
+
+ // Perform the test.
+ final StageId stageId = new StageId("xyz", 1);
+ final ReadableByteChunksFrameChannel channel =
ReadableByteChunksFrameChannel.create("testChannel", false);
+ channel.close(); // ReadableFrameChannel's close() method.
+
+ final ExecutionException e = Assert.assertThrows(
+ ExecutionException.class,
+ () -> workerClient.fetchChannelData(WORKER_ID, stageId, 2, 0,
channel).get()
+ );
+
+ MatcherAssert.assertThat(
+ e.getCause(),
+ CoreMatchers.instanceOf(ChannelClosedForWritesException.class)
+ );
+ }
+
+ @Test
+ public void test_fetchChannelData_empty_retry500() throws Exception
+ {
+ workerServiceClient.expectAndRespond(
+ new RequestBuilder(HttpMethod.GET, "/channels/xyz/1/2?offset=0")
+ .header(HttpHeaders.ACCEPT_ENCODING, "identity"),
+ HttpResponseStatus.INTERNAL_SERVER_ERROR,
+ ImmutableMap.of(),
+ ByteArrays.EMPTY_ARRAY
+ ).expectAndRespond(
+ new RequestBuilder(HttpMethod.GET, "/channels/xyz/1/2?offset=0")
+ .header(HttpHeaders.ACCEPT_ENCODING, "identity"),
+ HttpResponseStatus.OK,
+ fetchChannelDataResponseHeaders(false),
+ NIL_FILE_BYTES
+ ).expectAndRespond(
+ new RequestBuilder(HttpMethod.GET, "/channels/xyz/1/2?offset=" +
NIL_FILE_BYTES.length)
+ .header(HttpHeaders.ACCEPT_ENCODING, "identity"),
+ HttpResponseStatus.OK,
+ fetchChannelDataResponseHeaders(true),
+ ByteArrays.EMPTY_ARRAY
+ );
+
+ // Perform the test.
+ final StageId stageId = new StageId("xyz", 1);
+ final ReadableByteChunksFrameChannel channel =
ReadableByteChunksFrameChannel.create("testChannel", false);
+ final Future<List<List<Object>>> framesFuture = readChannelAsync(channel);
+
+ Assert.assertFalse(workerClient.fetchChannelData(WORKER_ID, stageId, 2, 0,
channel).get());
+ Assert.assertFalse(workerClient.fetchChannelData(WORKER_ID, stageId, 2, 0,
channel).get());
+ Assert.assertTrue(workerClient.fetchChannelData(WORKER_ID, stageId, 2,
NIL_FILE_BYTES.length, channel).get());
+ channel.doneWriting(); // Caller is expected to call doneWriting after
fetchChannelData returns true.
+
+ Assert.assertEquals(
+ 0,
+ framesFuture.get().size()
+ );
+ }
+
+ @Test
+ public void test_fetchChannelData_empty_serviceClientError()
+ {
+ workerServiceClient.expectAndThrow(
+ new RequestBuilder(HttpMethod.GET, "/channels/xyz/1/2?offset=0")
+ .header(HttpHeaders.ACCEPT_ENCODING, "identity"),
+ new IOException("Some error")
+ );
+
+ // Perform the test.
+ final StageId stageId = new StageId("xyz", 1);
+ final ReadableByteChunksFrameChannel channel =
ReadableByteChunksFrameChannel.create("testChannel", false);
+
+ final ExecutionException e = Assert.assertThrows(
+ ExecutionException.class,
+ () -> workerClient.fetchChannelData(WORKER_ID, stageId, 2, 0,
channel).get()
+ );
+
+ MatcherAssert.assertThat(
+ e.getCause(),
+ CoreMatchers.allOf(
+ CoreMatchers.instanceOf(IOException.class),
+ ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Some
error"))
+ )
+ );
+
+ channel.close();
+ }
+
+ @Test
+ public void test_fetchChannelData_nonEmpty() throws Exception
+ {
+ workerServiceClient.expectAndRespond(
+ new RequestBuilder(HttpMethod.GET, "/channels/xyz/1/2?offset=0")
+ .header(HttpHeaders.ACCEPT_ENCODING, "identity"),
+ HttpResponseStatus.OK,
+ ImmutableMap.of(HttpHeaders.CONTENT_TYPE,
MediaType.APPLICATION_OCTET_STREAM),
+ FILE_BYTES
+ ).expectAndRespond(
+ new RequestBuilder(HttpMethod.GET, "/channels/xyz/1/2?offset=" +
FILE_BYTES.length)
+ .header(HttpHeaders.ACCEPT_ENCODING, "identity"),
+ HttpResponseStatus.OK,
+ fetchChannelDataResponseHeaders(true),
+ ByteArrays.EMPTY_ARRAY
+ );
+
+ // Perform the test.
+ final StageId stageId = new StageId("xyz", 1);
+ final ReadableByteChunksFrameChannel channel =
ReadableByteChunksFrameChannel.create("testChannel", false);
+ final Future<List<List<Object>>> framesFuture = readChannelAsync(channel);
+
+ Assert.assertFalse(workerClient.fetchChannelData(WORKER_ID, stageId, 2, 0,
channel).get());
+ Assert.assertTrue(workerClient.fetchChannelData(WORKER_ID, stageId, 2,
FILE_BYTES.length, channel).get());
+ channel.doneWriting(); // Caller is expected to call doneWriting after
fetchChannelData returns true.
+
+ FrameTestUtil.assertRowsEqual(
+ FrameTestUtil.readRowsFromCursorFactory(new
QueryableIndexCursorFactory(TestIndex.getMMappedTestIndex())),
+ Sequences.simple(framesFuture.get())
+ );
+ }
+
+ private Future<List<List<Object>>> readChannelAsync(final
ReadableFrameChannel channel)
+ {
+ return exec.submit(() -> {
+ final List<List<Object>> retVal = new ArrayList<>();
+ while (!channel.isFinished()) {
+ FutureUtils.getUnchecked(channel.readabilityFuture(), false);
+
+ if (channel.canRead()) {
+ final Frame frame = channel.read();
+
retVal.addAll(FrameTestUtil.readRowsFromCursorFactory(FRAME_READER.makeCursorFactory(frame)).toList());
+ }
+ }
+ channel.close();
+ return retVal;
+ });
+ }
+
+ /**
+ * Returns a frame file (as bytes) from a sequence of frames.
+ */
+ private static byte[] toFileBytes(final Sequence<Frame> frames)
+ {
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final FrameFileWriter writer =
+ FrameFileWriter.open(Channels.newChannel(baos), null,
ByteTracker.unboundedTracker());
+ frames.forEach(frame -> {
+ try {
+ writer.writeFrame(frame, FrameFileWriter.NO_PARTITION);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ CloseableUtils.closeAndWrapExceptions(writer);
+ return baos.toByteArray();
+ }
+
+
+ /**
+ * Expected response headers for the "fetch channel data" API.
+ */
+ private static Map<String, String> fetchChannelDataResponseHeaders(final
boolean lastResponse)
+ {
+ final ImmutableMap.Builder<String, String> builder =
+ ImmutableMap.<String, String>builder()
+ .put(HttpHeaders.CONTENT_TYPE,
MediaType.APPLICATION_OCTET_STREAM);
+
+ if (lastResponse) {
+ builder.put(
+ FrameFileHttpResponseHandler.HEADER_LAST_FETCH_NAME,
+ FrameFileHttpResponseHandler.HEADER_LAST_FETCH_VALUE
+ );
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Worker client that communicates with a single worker named {@link
#WORKER_ID}.
+ */
+ private static class TestWorkerClient extends BaseWorkerClientImpl
+ {
+ private final ServiceClient workerServiceClient;
+
+ public TestWorkerClient(ObjectMapper objectMapper, ServiceClient
workerServiceClient)
+ {
+ super(objectMapper, MediaType.APPLICATION_JSON);
+ this.workerServiceClient = workerServiceClient;
+ }
+
+ @Override
+ protected ServiceClient getClient(String workerId)
+ {
+ if (WORKER_ID.equals(workerId)) {
+ return workerServiceClient;
+ } else {
+ throw new ISE("Expected workerId[%s], got[%s]", WORKER_ID, workerId);
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ // Nothing to close.
+ }
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/frame/channel/ChannelClosedForWritesException.java
b/processing/src/main/java/org/apache/druid/frame/channel/ChannelClosedForWritesException.java
new file mode 100644
index 00000000000..93379017491
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/frame/channel/ChannelClosedForWritesException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.channel;
+
+/**
+ * Exception thrown by {@link ReadableByteChunksFrameChannel#addChunk(byte[])}
when the channel has been closed
+ * for writes, i.e., after {@link
ReadableByteChunksFrameChannel#doneWriting()} or
+ * {@link ReadableByteChunksFrameChannel#close()} has been called.
+ */
+public class ChannelClosedForWritesException extends RuntimeException
+{
+ public ChannelClosedForWritesException()
+ {
+ super("Channel is no longer accepting writes");
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
index a4a40d70a38..79ad621de28 100644
---
a/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
+++
b/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java
@@ -132,13 +132,15 @@ public class ReadableByteChunksFrameChannel implements
ReadableFrameChannel
* chunks. (This is not enforced; addChunk will continue to accept new
chunks even if the channel is over its limit.)
*
* When done adding chunks call {@code doneWriting}.
+ *
+ * @throws ChannelClosedForWritesException if the channel is closed
*/
@Nullable
public ListenableFuture<?> addChunk(final byte[] chunk)
{
synchronized (lock) {
if (noMoreWrites) {
- throw new ISE("Channel is no longer accepting writes");
+ throw new ChannelClosedForWritesException();
}
try {
diff --git
a/processing/src/test/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest.java
b/processing/src/test/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest.java
index 32faac85276..a81d3914b23 100644
---
a/processing/src/test/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest.java
+++
b/processing/src/test/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannelTest.java
@@ -118,6 +118,19 @@ public class ReadableByteChunksFrameChannelTest
channel.close();
}
+ @Test
+ public void testAddChunkAfterDoneWriting()
+ {
+ try (final ReadableByteChunksFrameChannel channel =
ReadableByteChunksFrameChannel.create("test", false)) {
+ channel.doneWriting();
+
+ Assert.assertThrows(
+ ChannelClosedForWritesException.class,
+ () -> channel.addChunk(new byte[]{})
+ );
+ }
+ }
+
@Test
public void testTruncatedFrameFile() throws IOException
{
diff --git a/server/src/test/java/org/apache/druid/rpc/MockServiceClient.java
b/server/src/test/java/org/apache/druid/rpc/MockServiceClient.java
index da817c3da3b..021db219d96 100644
--- a/server/src/test/java/org/apache/druid/rpc/MockServiceClient.java
+++ b/server/src/test/java/org/apache/druid/rpc/MockServiceClient.java
@@ -41,6 +41,7 @@ import java.util.Queue;
public class MockServiceClient implements ServiceClient
{
private final Queue<Expectation> expectations = new ArrayDeque<>(16);
+ private int requestNumber = -1;
@Override
public <IntermediateType, FinalType> ListenableFuture<FinalType>
asyncRequest(
@@ -50,8 +51,9 @@ public class MockServiceClient implements ServiceClient
{
final Expectation expectation = expectations.poll();
+ requestNumber++;
Assert.assertEquals(
- "request",
+ "request[" + requestNumber + "]",
expectation == null ? null : expectation.request,
requestBuilder
);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]