[ 
https://issues.apache.org/jira/browse/KUDU-1868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16752770#comment-16752770
 ] 

Will Berkeley commented on KUDU-1868:
-------------------------------------

I've done a good amount of investigation into this now.

First of all, here's a changelist that adds two tests to the Java client that 
expose problems related to how timeouts work in the Java client.

{noformat}
commit 190aedeef663212cd0ce37d45e5cde35e0de39e8
Author: Will Berkeley <[email protected]>
Date:   Fri Jan 25 14:09:54 2019 -0800

    KUDU-1868 tests
    
    Change-Id: I8d823b63ac0a41cc5e42b63a7c19e0ef777e1dea

diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index b849e9f63..33e280021 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -61,6 +61,7 @@ import 
org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
 import org.apache.kudu.test.ClientTestUtil;
 import org.apache.kudu.util.TimestampUtil;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -306,6 +307,51 @@ public class TestKuduClient {
     }
   }
 
+  /*
+   * This is a repro for KUDU-1868. When scanning, the server might be slow to 
respond to an
+   * individual TabletServerService.Scan request. If it takes longer than the 
socket read timeout
+   * to respond, and there isn't other traffic on the connection, the netty 
Channel may send a
+   * socket read timeout event. This will cause the scan request to be 
retried. If the scan request
+   * is not the first scan request, this will cause the server to see the same 
call id for the
+   * same scanner twice, causing the "Invalid call sequence ID in scan 
request" exception.
+   */
+  @Test(timeout = 100000)
+  @Ignore
+  @TabletServerConfig(flags = { 
"--scanner_inject_latency_on_subsequent_batches=1000" })
+  public void testKUDU1868() throws Exception {
+    // Create a basic table and load it with data.
+    int numRows = 1000;
+    KuduTable table = client.createTable(
+        TABLE_NAME,
+        basicSchema,
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 
2));
+    KuduSession session = client.newSession();
+    for (int i = 0; i < numRows; i++) {
+      Insert insert = createBasicSchemaInsert(table, i);
+      session.apply(insert);
+    }
+
+    // Make a new client with a socket read timeout much shorter than how long 
it will take the
+    // server to respond to continue scan requests, and use it to scan the 
table created above.
+    KuduClient shortRecvTimeoutClient =
+        new KuduClient.KuduClientBuilder(client.getMasterAddressesAsString())
+            .defaultSocketReadTimeoutMs(500)
+            .build();
+    
shortRecvTimeoutClient.updateLastPropagatedTimestamp(client.getLastPropagatedTimestamp());
+    KuduTable shortRecvTimeoutTable = 
shortRecvTimeoutClient.openTable(TABLE_NAME);
+    // Set a small batch size so there will be data for multiple roundtrips.
+    KuduScanner scanner = shortRecvTimeoutClient
+        .newScannerBuilder(shortRecvTimeoutTable)
+        .batchSizeBytes(100)
+        .build();
+
+    // The first request that create the scanner will not hand and will be 
fine.
+    scanner.nextRows();
+
+    // The second will result in "Invalid call sequence ID in scan request", 
but it should be fine.
+    scanner.nextRows();
+  }
+
   /**
    * Test creating a table with columns with different combinations of NOT 
NULL and
    * default values, inserting rows, and checking the results are as expected.
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
index db3dcbd56..57595f292 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestTimeouts.java
@@ -19,10 +19,12 @@ package org.apache.kudu.client;
 import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
 import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
 import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.KuduTestHarness.TabletServerConfig;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -59,7 +61,7 @@ public class TestTimeouts {
 
     KuduSession lowTimeoutSession = lowTimeoutsClient.newSession();
 
-    OperationResponse response = 
lowTimeoutSession.apply(createBasicSchemaInsert(table, 1));
+    OperationResponse response = 
lowTimeoutSession.apply(createBasicSchemaInsert(table, 0));
     assertTrue(response.hasRowError());
     assertTrue(response.getRowError().getErrorStatus().isTimedOut());
 
@@ -71,4 +73,56 @@ public class TestTimeouts {
       assertTrue(ex.getStatus().isTimedOut());
     }
   }
+
+  /**
+   * This tests checks for an unfortunate behavior of the Java client: in 
order for an RPC to
+   * timeout, there needs to be an event on its Connection. For example, if 
the remote hangs
+   * responding to a scan request, without a socket read timeout the client 
will not time out
+   * the scan request RPC.
+   */
+  @Test(timeout = 100000)
+  @TabletServerConfig(flags = { 
"--scanner_inject_latency_on_each_batch_ms=10000" })
+  public void testNoTimeoutWithoutSocketReadTimeout() throws Exception {
+    // Set up a table with one row.
+    KuduClient client = harness.getClient();
+    KuduTable table = client.createTable(
+        TABLE_NAME,
+        getBasicSchema(),
+        getBasicCreateTableOptions());
+    assertFalse(client
+        .newSession()
+        .apply(createBasicSchemaInsert(table, 0))
+        .hasRowError());
+
+    // Create a new client with no socket read timeout (0 means do not set a 
read timeout).
+    AsyncKuduClient asyncNoRecvTimeoutClient =
+        new 
AsyncKuduClient.AsyncKuduClientBuilder(harness.getMasterAddressesAsString())
+            .defaultSocketReadTimeoutMs(0)
+            .build();
+    // Propagate the timestamp to be sure we should see the row that was 
inserted by another client.
+    
asyncNoRecvTimeoutClient.updateLastPropagatedTimestamp(client.getLastPropagatedTimestamp());
+    KuduTable asyncNoRecvTimeoutTable = 
asyncNoRecvTimeoutClient.openTable(TABLE_NAME).join();
+
+    // Scan with a short timeout. It needs to be long enough to allow client 
<-> master lookup,
+    // though.
+    AsyncKuduScanner scanner = asyncNoRecvTimeoutClient
+        .newScannerBuilder(asyncNoRecvTimeoutTable)
+        .scanRequestTimeout(50)
+        .build();
+
+    // With no socket read timeout and a very delayed response from the 
server, scan request RPCs
+    // should not time out.
+    try {
+      scanner.nextRows().join(1000);
+      fail("should not have completed nextRows");
+    } catch (com.stumbleupon.async.TimeoutException e) {
+      // Good. The scan request did not time out- joining on a response did.
+    } catch (NonRecoverableException e) {
+      // Special case a wrong kind of timeout.
+      if (e.getStatus().isTimedOut()) {
+        fail("scan request RPC timed out, but it is not expected to");
+      }
+      throw e;
+    }
+  }
 }
diff --git a/src/kudu/tserver/tablet_service.cc 
b/src/kudu/tserver/tablet_service.cc
index 9834f8b4d..e1cdded0d 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -135,6 +135,12 @@ DEFINE_int32(scanner_inject_latency_on_each_batch_ms, 0,
              "Used for tests.");
 TAG_FLAG(scanner_inject_latency_on_each_batch_ms, unsafe);
 
+DEFINE_int32(scanner_inject_latency_on_subsequent_batches_ms, 0,
+             "If set, the scanner will pause the specified number of 
millisesconds "
+             "before reading each batch of data on the tablet server, after 
the first batch. "
+             "Used for tests.");
+TAG_FLAG(scanner_inject_latency_on_subsequent_batches_ms, unsafe);
+
 DEFINE_bool(scanner_inject_service_unavailable_on_continue_scan, false,
            "If set, the scanner will return a ServiceUnavailable Status on "
            "any Scan continuation RPC call. Used for tests.");
@@ -2121,6 +2127,11 @@ Status 
TabletServiceImpl::HandleContinueScanRequest(const ScanRequestPB* req,
     if (PREDICT_FALSE(FLAGS_scanner_inject_latency_on_each_batch_ms > 0)) {
       
SleepFor(MonoDelta::FromMilliseconds(FLAGS_scanner_inject_latency_on_each_batch_ms));
     }
+    if (PREDICT_FALSE(FLAGS_scanner_inject_latency_on_subsequent_batches_ms > 
0 &&
+                      !req->has_new_scan_request())) {
+      SleepFor(MonoDelta::FromMilliseconds(
+          FLAGS_scanner_inject_latency_on_subsequent_batches_ms));
+    }
 
     Status s = iter->NextBlock(&block);
     if (PREDICT_FALSE(!s.ok())) {

{noformat}

Note that I added an extra flag to the server, and that testKUDU1868 is 
expected to fail.

So, what happens to cause the characteristic {{Invalid call sequence ID in scan 
request}} exception is the following:

* A new scan request arrives on the server. It is processed successfully and a 
response is returned to the client. There are more rows to scan.
* The client send a continue scan request for more rows. This request takes a 
long time to respond.
* There is no other traffic on the connection to delay the socket read timeout 
(aka the recv timeout), so the socket read timeout pops.
* This causes the Java client to tear down the connection, put up a new one, 
and resend the continue scan request. However, a previous scan request with the 
same call id has already been processed or is being processed, so the server 
rejected it with {{Invalid call sequence ID in scan request}}.

We shouldn't be using socket read timeouts at all, except possibly during 
negotiation when negotiation is the only activity on the socket. Timeouts 
should be a property of a call, not a connection. Getting rid of socket read 
timeout would prevent this error, and it would fix the problem where a call 
that is hanging on a server with which the client is otherwise communicating 
hangs forever. To do this we need to trigger timeouts in the client without 
relying on netty Channel events. The simplest way to do this is to schedule 
timeout tasks on a timer-- we have a {{HashedWheelTimer}} available. However, 
there's a significant drawback. An RPC may actually spawn many sub RPCs, 
especially if the RPC needs to retry due to, say, backpressure. Each of these 
RPCs needs a different timeout in order to make the overall RPC call obey its 
timeout (e.g. if the user calls {{KuduClient#listTables}} with a timeout of 
10s, and {{ConnectToCluster}} happens in 5s, and then an actual ListTables RPC 
exchange happens, the ListTables RPC needs a timeout of 5s). There's no way to 
cancel a task on the timer (see the {{org.jboss.netty.util.Timer}} interface or 
its successor in netty 4), so all of the timer tasks scheduled for all RPCs 
will run after their timeout expires, even if the RPC completed in 1/1000 of 
the timeout and has long since been able to be GC'd. Assuming a default timeout 
for all RPCs, this would cause memory usage O(number of RPCs launched in last 
timeout period)...i.e. the memory usage would grow linearly with RPC 
throughput. That seems kind of lousy.

On the other hand, it's what 
[hbase|https://github.com/apache/hbase/blob/025ddce868eb06b4072b5152c5ffae5a01e7ae30/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java#L193]
 and the thirdparty [asynchbase 
client|https://github.com/OpenTSDB/asynchbase/blob/master/src/HBaseClient.java#L978]
 do, so maybe it's not so bad in practice? It doesn't smell acceptable to me. 
Looking around a bit more, it does look like Cassandra ran into this issue in 
their [csharp client|https://datastax-oss.atlassian.net/browse/CSHARP-376]. 
They have an implementation of a hash wheel timer that prunes cancelled tasks.

> Java client mishandles socket read timeouts for scans
> -----------------------------------------------------
>
>                 Key: KUDU-1868
>                 URL: https://issues.apache.org/jira/browse/KUDU-1868
>             Project: Kudu
>          Issue Type: Bug
>          Components: client
>    Affects Versions: 1.2.0
>            Reporter: Jean-Daniel Cryans
>            Assignee: Will Berkeley
>            Priority: Major
>
> Scan calls from the Java client that take more than the socket read timeout 
> get retried (unless the operation timeout has expired) instead of being 
> killed. Users will see this:
> {code}
> org.apache.kudu.client.NonRecoverableException: Invalid call sequence ID in 
> scan request
> {code}
> Note that the right behavior here would still end up killing the scanner, so 
> this is really a problem the user has to deal with! It's usually caused by 
> slow IO, combined with very selection scans.
> Workaround: set defaultSocketReadTimeoutMs higher, ideally equal to 
> defaultOperationTimeoutMs (the defaults are 10 and 30 seconds respectively). 
> But really the user should investigate why single the scans are so slow.
> One potentially easy fix to this is to handle retries differently for 
> scanners so that the user gets nicer exception. A harder fix is to handle 
> socket read timeouts completely differently, basically it should be per-RPC 
> and not per TabletClient like it is right now.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to