[
https://issues.apache.org/jira/browse/KUDU-1868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16752770#comment-16752770
]
Will Berkeley commented on KUDU-1868:
-------------------------------------
I've done a good amount of investigation into this now.
First of all, here's a changelist that adds two tests to the Java client that
expose problems related to how timeouts work in the Java client.
{noformat}
commit 190aedeef663212cd0ce37d45e5cde35e0de39e8
Author: Will Berkeley <[email protected]>
Date: Fri Jan 25 14:09:54 2019 -0800
KUDU-1868 tests
Change-Id: I8d823b63ac0a41cc5e42b63a7c19e0ef777e1dea
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index b849e9f63..33e280021 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -61,6 +61,7 @@ import
org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
import org.apache.kudu.test.ClientTestUtil;
import org.apache.kudu.util.TimestampUtil;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@@ -306,6 +307,51 @@ public class TestKuduClient {
}
}
+ /*
+ * This is a repro for KUDU-1868. When scanning, the server might be slow to
respond to an
+ * individual TabletServerService.Scan request. If it takes longer than the
socket read timeout
+ * to respond, and there isn't other traffic on the connection, the netty
Channel may send a
+ * socket read timeout event. This will cause the scan request to be
retried. If the scan request
+ * is not the first scan request, this will cause the server to see the same
call id for the
+ * same scanner twice, causing the "Invalid call sequence ID in scan
request" exception.
+ */
+ @Test(timeout = 100000)
+ @Ignore
+ @TabletServerConfig(flags = {
"--scanner_inject_latency_on_subsequent_batches=1000" })
+ public void testKUDU1868() throws Exception {
+ // Create a basic table and load it with data.
+ int numRows = 1000;
+ KuduTable table = client.createTable(
+ TABLE_NAME,
+ basicSchema,
+ new CreateTableOptions().addHashPartitions(ImmutableList.of("key"),
2));
+ KuduSession session = client.newSession();
+ for (int i = 0; i < numRows; i++) {
+ Insert insert = createBasicSchemaInsert(table, i);
+ session.apply(insert);
+ }
+
+ // Make a new client with a socket read timeout much shorter than how long
it will take the
+ // server to respond to continue scan requests, and use it to scan the
table created above.
+ KuduClient shortRecvTimeoutClient =
+ new KuduClient.KuduClientBuilder(client.getMasterAddressesAsString())
+ .defaultSocketReadTimeoutMs(500)
+ .build();
+
shortRecvTimeoutClient.updateLastPropagatedTimestamp(client.getLastPropagatedTimestamp());
+ KuduTable shortRecvTimeoutTable =
shortRecvTimeoutClient.openTable(TABLE_NAME);
+ // Set a small batch size so there will be data for multiple roundtrips.
+ KuduScanner scanner = shortRecvTimeoutClient
+ .newScannerBuilder(shortRecvTimeoutTable)
+ .batchSizeBytes(100)
+ .build();
+
+ // The first request that create the scanner will not hand and will be
fine.
+ scanner.nextRows();
+
+ // The second will result in "Invalid call sequence ID in scan request",
but it should be fine.
+ scanner.nextRows();
+ }
+
/**
* Test creating a table with columns with different combinations of NOT
NULL and
* default values, inserting rows, and checking the results are as expected.
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
index db3dcbd56..57595f292 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
@@ -19,10 +19,12 @@ package org.apache.kudu.client;
import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
import org.junit.Rule;
import org.junit.Test;
@@ -59,7 +61,7 @@ public class TestTimeouts {
KuduSession lowTimeoutSession = lowTimeoutsClient.newSession();
- OperationResponse response =
lowTimeoutSession.apply(createBasicSchemaInsert(table, 1));
+ OperationResponse response =
lowTimeoutSession.apply(createBasicSchemaInsert(table, 0));
assertTrue(response.hasRowError());
assertTrue(response.getRowError().getErrorStatus().isTimedOut());
@@ -71,4 +73,56 @@ public class TestTimeouts {
assertTrue(ex.getStatus().isTimedOut());
}
}
+
+ /**
+ * This tests checks for an unfortunate behavior of the Java client: in
order for an RPC to
+ * timeout, there needs to be an event on its Connection. For example, if
the remote hangs
+ * responding to a scan request, without a socket read timeout the client
will not time out
+ * the scan request RPC.
+ */
+ @Test(timeout = 100000)
+ @TabletServerConfig(flags = {
"--scanner_inject_latency_on_each_batch_ms=10000" })
+ public void testNoTimeoutWithoutSocketReadTimeout() throws Exception {
+ // Set up a table with one row.
+ KuduClient client = harness.getClient();
+ KuduTable table = client.createTable(
+ TABLE_NAME,
+ getBasicSchema(),
+ getBasicCreateTableOptions());
+ assertFalse(client
+ .newSession()
+ .apply(createBasicSchemaInsert(table, 0))
+ .hasRowError());
+
+ // Create a new client with no socket read timeout (0 means do not set a
read timeout).
+ AsyncKuduClient asyncNoRecvTimeoutClient =
+ new
AsyncKuduClient.AsyncKuduClientBuilder(harness.getMasterAddressesAsString())
+ .defaultSocketReadTimeoutMs(0)
+ .build();
+ // Propagate the timestamp to be sure we should see the row that was
inserted by another client.
+
asyncNoRecvTimeoutClient.updateLastPropagatedTimestamp(client.getLastPropagatedTimestamp());
+ KuduTable asyncNoRecvTimeoutTable =
asyncNoRecvTimeoutClient.openTable(TABLE_NAME).join();
+
+ // Scan with a short timeout. It needs to be long enough to allow client
<-> master lookup,
+ // though.
+ AsyncKuduScanner scanner = asyncNoRecvTimeoutClient
+ .newScannerBuilder(asyncNoRecvTimeoutTable)
+ .scanRequestTimeout(50)
+ .build();
+
+ // With no socket read timeout and a very delayed response from the
server, scan request RPCs
+ // should not time out.
+ try {
+ scanner.nextRows().join(1000);
+ fail("should not have completed nextRows");
+ } catch (com.stumbleupon.async.TimeoutException e) {
+ // Good. The scan request did not time out- joining on a response did.
+ } catch (NonRecoverableException e) {
+ // Special case a wrong kind of timeout.
+ if (e.getStatus().isTimedOut()) {
+ fail("scan request RPC timed out, but it is not expected to");
+ }
+ throw e;
+ }
+ }
}
diff --git a/src/kudu/tserver/tablet_service.cc
b/src/kudu/tserver/tablet_service.cc
index 9834f8b4d..e1cdded0d 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -135,6 +135,12 @@ DEFINE_int32(scanner_inject_latency_on_each_batch_ms, 0,
"Used for tests.");
TAG_FLAG(scanner_inject_latency_on_each_batch_ms, unsafe);
+DEFINE_int32(scanner_inject_latency_on_subsequent_batches_ms, 0,
+ "If set, the scanner will pause the specified number of
millisesconds "
+ "before reading each batch of data on the tablet server, after
the first batch. "
+ "Used for tests.");
+TAG_FLAG(scanner_inject_latency_on_subsequent_batches_ms, unsafe);
+
DEFINE_bool(scanner_inject_service_unavailable_on_continue_scan, false,
"If set, the scanner will return a ServiceUnavailable Status on "
"any Scan continuation RPC call. Used for tests.");
@@ -2121,6 +2127,11 @@ Status
TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
if (PREDICT_FALSE(FLAGS_scanner_inject_latency_on_each_batch_ms > 0)) {
SleepFor(MonoDelta::FromMilliseconds(FLAGS_scanner_inject_latency_on_each_batch_ms));
}
+ if (PREDICT_FALSE(FLAGS_scanner_inject_latency_on_subsequent_batches_ms >
0 &&
+ !req->has_new_scan_request())) {
+ SleepFor(MonoDelta::FromMilliseconds(
+ FLAGS_scanner_inject_latency_on_subsequent_batches_ms));
+ }
Status s = iter->NextBlock(&block);
if (PREDICT_FALSE(!s.ok())) {
{noformat}
Note that I added an extra flag to the server, and that testKUDU1868 is
expected to fail.
So, what happens to cause the characteristic {{Invalid call sequence ID in scan
request}} exception is the following:
* A new scan request arrives on the server. It is processed successfully and a
response is returned to the client. There are more rows to scan.
* The client send a continue scan request for more rows. This request takes a
long time to respond.
* There is no other traffic on the connection to delay the socket read timeout
(aka the recv timeout), so the socket read timeout pops.
* This causes the Java client to tear down the connection, put up a new one,
and resend the continue scan request. However, a previous scan request with the
same call id has already been processed or is being processed, so the server
rejected it with {{Invalid call sequence ID in scan request}}.
We shouldn't be using socket read timeouts at all, except possibly during
negotiation when negotiation is the only activity on the socket. Timeouts
should be a property of a call, not a connection. Getting rid of socket read
timeout would prevent this error, and it would fix the problem where a call
that is hanging on a server with which the client is otherwise communicating
hangs forever. To do this we need to trigger timeouts in the client without
relying on netty Channel events. The simplest way to do this is to schedule
timeout tasks on a timer-- we have a {{HashedWheelTimer}} available. However,
there's a significant drawback. An RPC may actually spawn many sub RPCs,
especially if the RPC needs to retry due to, say, backpressure. Each of these
RPCs needs a different timeout in order to make the overall RPC call obey its
timeout (e.g. if the user calls {{KuduClient#listTables}} with a timeout of
10s, and {{ConnectToCluster}} happens in 5s, and then an actual ListTables RPC
exchange happens, the ListTables RPC needs a timeout of 5s). There's no way to
cancel a task on the timer (see the {{org.jboss.netty.util.Timer}} interface or
its successor in netty 4), so all of the timer tasks scheduled for all RPCs
will run after their timeout expires, even if the RPC completed in 1/1000 of
the timeout and has long since been able to be GC'd. Assuming a default timeout
for all RPCs, this would cause memory usage O(number of RPCs launched in last
timeout period)...i.e. the memory usage would grow linearly with RPC
throughput. That seems kind of lousy.
On the other hand, it's what
[hbase|https://github.com/apache/hbase/blob/025ddce868eb06b4072b5152c5ffae5a01e7ae30/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java#L193]
and the thirdparty [asynchbase
client|https://github.com/OpenTSDB/asynchbase/blob/master/src/HBaseClient.java#L978]
do, so maybe it's not so bad in practice? It doesn't smell acceptable to me.
Looking around a bit more, it does look like Cassandra ran into this issue in
their [csharp client|https://datastax-oss.atlassian.net/browse/CSHARP-376].
They have an implementation of a hash wheel timer that prunes cancelled tasks.
> Java client mishandles socket read timeouts for scans
> -----------------------------------------------------
>
> Key: KUDU-1868
> URL: https://issues.apache.org/jira/browse/KUDU-1868
> Project: Kudu
> Issue Type: Bug
> Components: client
> Affects Versions: 1.2.0
> Reporter: Jean-Daniel Cryans
> Assignee: Will Berkeley
> Priority: Major
>
> Scan calls from the Java client that take more than the socket read timeout
> get retried (unless the operation timeout has expired) instead of being
> killed. Users will see this:
> {code}
> org.apache.kudu.client.NonRecoverableException: Invalid call sequence ID in
> scan request
> {code}
> Note that the right behavior here would still end up killing the scanner, so
> this is really a problem the user has to deal with! It's usually caused by
> slow IO, combined with very selection scans.
> Workaround: set defaultSocketReadTimeoutMs higher, ideally equal to
> defaultOperationTimeoutMs (the defaults are 10 and 30 seconds respectively).
> But really the user should investigate why single the scans are so slow.
> One potentially easy fix to this is to handle retries differently for
> scanners so that the user gets nicer exception. A harder fix is to handle
> socket read timeouts completely differently, basically it should be per-RPC
> and not per TabletClient like it is right now.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)