This is an automated email from the ASF dual-hosted git repository.

zhangyifan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 8683b8bdb [Java] KUDU-3498 Scanner keeps alive in periodically
8683b8bdb is described below

commit 8683b8bdb675db96aac52d75a31d00232f7b9fb8
Author: xinghuayu007 <[email protected]>
AuthorDate: Tue Jan 16 12:52:09 2024 +0800

    [Java] KUDU-3498 Scanner keeps alive in periodically
    
    Kudu caches the scanner in the tablet server for continuing
    reading. It will be expired if the idle time is over the defined
    scanner ttl time. Sometimes the client reads a batch of data,
    if the data is every large, it takes a long time to handle it.
    Then the client reads the next batch using the same scanner, the
    scanner will be expired even if it has sent a keep alive request.
    
    This patch adds support for keeping a scanner alive periodically.
    It uses a timer to send keep alive requests background. So,
    it will never be expired when the scanner is in using.
    
    Change-Id: I50648e987b72aead472a20ff4336e3e7f23d8e06
    Reviewed-on: http://gerrit.cloudera.org:8080/20761
    Reviewed-by: Yifan Zhang <[email protected]>
    Tested-by: Yifan Zhang <[email protected]>
---
 .../org/apache/kudu/client/AsyncKuduScanner.java   |  53 ++++++++-
 .../java/org/apache/kudu/client/KuduScanner.java   |  33 ++++++
 .../org/apache/kudu/client/TestKuduClient.java     | 120 +++++++++++++++++++++
 3 files changed, 205 insertions(+), 1 deletion(-)

diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index fd3930e33..1013a148b 100644
--- 
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ 
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -39,13 +39,18 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.Message;
 import com.google.protobuf.UnsafeByteOperations;
 import com.stumbleupon.async.Callback;
 import com.stumbleupon.async.Deferred;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 import org.slf4j.Logger;
@@ -287,6 +292,12 @@ public final class AsyncKuduScanner {
 
   private String queryId;
 
+  private final HashedWheelTimer timer = new HashedWheelTimer(
+        new ThreadFactoryBuilder().setDaemon(true).build(), 20,
+        TimeUnit.MILLISECONDS);
+
+  private Timeout keepAliveTimeout;
+
   /**
    * UUID of the tserver which the scanner is bound with. The following scans 
of
    * this scanner will be sent to the tserver.
@@ -477,7 +488,11 @@ public final class AsyncKuduScanner {
    * @return true if there might be more data to scan, else false
    */
   public boolean hasMoreRows() {
-    return this.canRequestMore || cachedPrefetcherDeferred.get() != null;
+    boolean hasMore = this.canRequestMore || cachedPrefetcherDeferred.get() != 
null;
+    if (!hasMore) {
+      stopKeepAlivePeriodically();
+    }
+    return hasMore;
   }
 
   /**
@@ -837,6 +852,7 @@ public final class AsyncKuduScanner {
     if (closed) {
       return Deferred.fromResult(null);
     }
+    stopKeepAlivePeriodically();
     return client.closeScanner(this).addCallback(closedCallback()); // TODO 
errBack ?
   }
 
@@ -960,6 +976,41 @@ public final class AsyncKuduScanner {
     return client.keepAlive(this);
   }
 
+  /**
+   * Package-private access point for {@link AsyncKuduScanner}s to keep 
themselves
+   * alive on tablet servers by sending keep-alive requests periodically.
+   * @param keepAliveIntervalMS the interval of sending keep-alive requests.
+   * @return true if starting keep-alive timer successfully.
+   */
+  boolean startKeepAlivePeriodically(int keepAliveIntervalMS) {
+    if (closed) {
+      return false;
+    }
+    final class KeepAliveTimer implements TimerTask {
+      @Override
+      public void run(final Timeout timeout) {
+        keepAlive();
+        keepAliveTimeout = AsyncKuduClient.newTimeout(timer, this, 
keepAliveIntervalMS);
+      }
+    }
+
+    keepAliveTimeout = AsyncKuduClient.newTimeout(timer, new KeepAliveTimer(),
+                                                  keepAliveIntervalMS);
+    return true;
+  }
+
+  /**
+   * Package-private access point for {@link AsyncKuduScanner}s to stop
+   * keep-alive timer.
+   * @return true if stopping keep-alive timer successfully.
+   */
+  boolean stopKeepAlivePeriodically() {
+    if (keepAliveTimeout != null) {
+      return keepAliveTimeout.cancel();
+    }
+    return true;
+  }
+
   /**
    * Returns an RPC to fetch the next rows.
    */
diff --git 
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java 
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 02142630b..79ba19743 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -102,6 +102,39 @@ public class KuduScanner implements Iterable<RowResult> {
     KuduClient.joinAndHandleException(asyncScanner.keepAlive());
   }
 
+  /**
+   * Keep the current remote scanner alive by sending keep-alive requests 
periodically.
+   * <p>
+   * startKeepAlivePeriodically() uses a timer to call keepAlive() 
periodically,
+   * which is defined by parameter keepAliveIntervalMS. It sends keep-alive 
requests to
+   * the server periodically using a separate thread. This is useful if the 
client
+   * takes long time to handle the fetched data before having the chance to 
call
+   * keepAlive(). This can be called after the scanner is opened and the timer 
can be
+   * stopped by calling stopKeepAlivePeriodically().
+   * <p>
+   * @throws KuduException if anything went wrong.
+   * <p>
+   * * @return true if starting keep-alive timer successfully.
+   */
+  public final boolean startKeepAlivePeriodically(int keepAliveIntervalMS) 
throws KuduException {
+    return asyncScanner.startKeepAlivePeriodically(keepAliveIntervalMS);
+  }
+
+  /**
+   * Stop keeping the current remote scanner alive periodically.
+   * <p>
+   * This function stops to send keep-alive requests to the server 
periodically.
+   * After function startKeepAlivePeriodically is called, this function can be 
used to
+   * stop the keep-alive timer at any time. The timer will be stopped 
automatically
+   * after finishing scanning. But it can also be stopped manually by calling 
this
+   * function.
+   * <p>
+   * @return true if stopping keep-alive timer successfully.
+   */
+  public final boolean stopKeepAlivePeriodically() {
+    return asyncScanner.stopKeepAlivePeriodically();
+  }
+
   /**
    * @return true if the scanner has been closed.
    */
diff --git 
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java 
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index acb243cab..8f7073b13 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -48,6 +48,7 @@ import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
 
 import java.io.Closeable;
+import java.lang.reflect.Field;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.math.BigDecimal;
@@ -71,6 +72,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.stumbleupon.async.Deferred;
+import io.netty.util.Timeout;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -393,6 +395,124 @@ public class TestKuduClient {
     }
   }
 
+  /*
+   * Test keeping a scanner alive periodically beyond scanner ttl.
+   */
+  @Test(timeout = 100000)
+  @TabletServerConfig(flags = {
+      "--scanner_ttl_ms=" + SHORT_SCANNER_TTL_MS / 5,
+      "--scanner_gc_check_interval_us=" + SHORT_SCANNER_GC_US,
+  })
+  public void testKeepAlivePeriodically() throws Exception {
+    // Create a basic table and load it with data.
+    int numRows = 1000;
+    client.createTable(
+        TABLE_NAME,
+        basicSchema,
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 
3));
+    KuduSession session = client.newSession();
+    KuduTable table = client.openTable(TABLE_NAME);
+
+    for (int i = 0; i < numRows; i++) {
+      Insert insert = createBasicSchemaInsert(table, i);
+      session.apply(insert);
+    }
+
+    // Start keep-alive timer and read all data out. After read out all data,
+    // the keep-alive timer will be cancelled.
+    {
+      KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, 
table)
+          .replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
+          .batchSizeBytes(100)
+          .build();
+
+      scanner.startKeepAlivePeriodically(SHORT_SCANNER_TTL_MS / 10);
+      int rowCount = 0;
+      while (scanner.hasMoreRows()) {
+        // Sleep a long time to make scanner easy to be expired.
+        Thread.sleep(SHORT_SCANNER_TTL_MS / 2);
+        rowCount += scanner.nextRows().getNumRows();
+      }
+      assertEquals(numRows, rowCount);
+      // Check that keepAliveTimeout is cancelled.
+      Field fieldAsyncScanner = 
KuduScanner.class.getDeclaredField("asyncScanner");
+      fieldAsyncScanner.setAccessible(true);
+      AsyncKuduScanner asyncScanner = 
(AsyncKuduScanner)fieldAsyncScanner.get(scanner);
+      Field fieldKeepaliveTimeout =
+          AsyncKuduScanner.class.getDeclaredField("keepAliveTimeout");
+      fieldKeepaliveTimeout.setAccessible(true);
+      Timeout keepAliveTimeout = 
(Timeout)fieldKeepaliveTimeout.get(asyncScanner);
+      assertTrue(keepAliveTimeout.isCancelled());
+    }
+
+    // Start keep-alive timer then close it. After closing the client,
+    // the keep-alive timer will be closed.
+    {
+      KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, 
table)
+          .replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
+          .batchSizeBytes(100)
+          .build();
+
+      scanner.startKeepAlivePeriodically(SHORT_SCANNER_TTL_MS / 10);
+
+      // Check that keepAliveTimeout is not cancelled.
+      Field fieldAsyncScanner = 
KuduScanner.class.getDeclaredField("asyncScanner");
+      fieldAsyncScanner.setAccessible(true);
+      AsyncKuduScanner asyncScanner = 
(AsyncKuduScanner)fieldAsyncScanner.get(scanner);
+      Field fieldKeepaliveTimeout =
+          AsyncKuduScanner.class.getDeclaredField("keepAliveTimeout");
+      fieldKeepaliveTimeout.setAccessible(true);
+      Timeout keepAliveTimeout = 
(Timeout)fieldKeepaliveTimeout.get(asyncScanner);
+      assertFalse(keepAliveTimeout.isCancelled());
+
+      // Check that keepAliveTimeout is cancelled.
+      scanner.close();
+      assertTrue(keepAliveTimeout.isCancelled());
+    }
+  }
+
+  /*
+   * Test stoping the keep-alive timer.
+   */
+  @Test(timeout = 100000)
+  @TabletServerConfig(flags = {
+      "--scanner_ttl_ms=" + SHORT_SCANNER_TTL_MS / 5,
+      "--scanner_gc_check_interval_us=" + SHORT_SCANNER_GC_US,
+  })
+  public void testStopKeepAlivePeriodically() throws Exception {
+    // Create a basic table and load it with data.
+    int numRows = 1000;
+    client.createTable(
+        TABLE_NAME,
+        basicSchema,
+        new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 
3));
+    KuduSession session = client.newSession();
+    KuduTable table = client.openTable(TABLE_NAME);
+
+    for (int i = 0; i < numRows; i++) {
+      Insert insert = createBasicSchemaInsert(table, i);
+      session.apply(insert);
+    }
+
+    KuduScanner scanner = new KuduScanner.KuduScannerBuilder(asyncClient, 
table)
+        .replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
+        .batchSizeBytes(100) // Use a small batch size so we can call nextRows 
many times.
+        .build();
+    // Start the keep-alive timer and then close it. Read data will timeout.
+    assertTrue(scanner.startKeepAlivePeriodically(SHORT_SCANNER_TTL_MS / 10));
+    assertTrue(scanner.stopKeepAlivePeriodically());
+    while (scanner.hasMoreRows()) {
+      try {
+        // Sleep a long time to make scanner easy to be expired.
+        Thread.sleep(SHORT_SCANNER_TTL_MS / 2);
+        scanner.nextRows();
+      } catch (Exception e) {
+        assertTrue(e.toString().contains("not found (it may have expired)"));
+        break;
+      }
+    }
+  }
+
   /**
    * Test creating a table with columns with different combinations of NOT 
NULL and
    * default values, inserting rows, and checking the results are as expected.

Reply via email to