This is an automated email from the ASF dual-hosted git repository.
awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 345fce8 KUDU-1260: Fix prefetching bug on Java scanner
345fce8 is described below
commit 345fce81aa52fa378a57d6b610d42e751a1387a4
Author: Hongjiang Zhang <[email protected]>
AuthorDate: Mon Aug 16 11:57:54 2021 +0800
KUDU-1260: Fix prefetching bug on Java scanner
Add a UT to test prefetching. The UT has two concurrent threads: writing
thread and scanner thread. The writing thread records the timestamp of
its write, and the scanner thread creates two scanners (w and w/o
prefetching),
by comparing the scan result of the two scanners, we can verify the
prefetching result.
When prefetching is enabled, there is a RowResultIterator prefetched and
it will override the one which has not yet been consumed in current
implementation, as a result, some data will loss. The fix is simple:
use an atomic reference to cache the result and avoid value overriding.
Furthermore, there are at most two concurrent ScanRequests
sent to the tserver. But if the scan data reached the end, only one
hasMore=false is returned.
As a result, one of the ScanRequests got "scanner not found (it may have
expired)" exception.
The same issue occurs for KeepAliveRequest. This patch also addressed this
issue.
Change-Id: I853a041d86c75ec196d7d4ff45af4673c5c5f5cd
Reviewed-on: http://gerrit.cloudera.org:8080/17773
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <[email protected]>
---
.../org/apache/kudu/client/AsyncKuduScanner.java | 117 +++++---
.../kudu/client/TestKuduScannerPrefetching.java | 307 +++++++++++++++++++++
2 files changed, 389 insertions(+), 35 deletions(-)
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 9206495..79e18f8 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Message;
@@ -251,7 +252,7 @@ public final class AsyncKuduScanner {
private boolean closed = false;
- private boolean hasMore = true;
+ private boolean canRequestMore = true;
private long numRowsReturned = 0;
@@ -281,10 +282,29 @@ public final class AsyncKuduScanner {
*/
private int sequenceId;
- private Deferred<RowResultIterator> prefetcherDeferred;
-
final long scanRequestTimeout;
+ /**
+ * The prefetching result is cached in memory. This atomic reference is used
to avoid
+ * two concurrent prefetchings occur and the latest one overrides the
previous one.
+ */
+ private AtomicReference<Deferred<RowResultIterator>>
cachedPrefetcherDeferred =
+ new AtomicReference<>();
+
+ /**
+ * When scanner's prefetching is enabled, there are at most two concurrent
ScanRequests
+ * sent to the tserver. But if the scan data reached the end, only one
hasMore=false is returned.
+ * As a result, one of the ScanRequests got "scanner not found (it may have
expired)" exception.
+ * The same issue occurs for KeepAliveRequest.
+ *
+ * @param errorCode error code returned from tserver
+ * @return true if this can be ignored
+ */
+ boolean canBeIgnored(TabletServerErrorPB.Code errorCode) {
+ return errorCode == TabletServerErrorPB.Code.SCANNER_EXPIRED &&
+ prefetching && closed;
+ }
+
AsyncKuduScanner(AsyncKuduClient client, KuduTable table, List<String>
projectedNames,
List<Integer> projectedIndexes, ReadMode readMode, boolean
isFaultTolerant,
long scanRequestTimeout,
@@ -362,7 +382,7 @@ public final class AsyncKuduScanner {
// short circuited without contacting any tablet servers.
if (!pruner.hasMorePartitionKeyRanges()) {
LOG.debug("Short circuiting scan");
- this.hasMore = false;
+ this.canRequestMore = false;
this.closed = true;
}
@@ -421,11 +441,11 @@ public final class AsyncKuduScanner {
}
/**
- * Tells if the last rpc returned that there might be more rows to scan.
+ * Tells if there is data to scan, including both rpc or cached rpc result.
* @return true if there might be more data to scan, else false
*/
public boolean hasMoreRows() {
- return this.hasMore;
+ return this.canRequestMore || cachedPrefetcherDeferred.get() != null;
}
/**
@@ -529,6 +549,10 @@ public final class AsyncKuduScanner {
*/
public Deferred<RowResultIterator> nextRows() {
if (closed) { // We're already done scanning.
+ if (prefetching && cachedPrefetcherDeferred.get() != null) {
+ // return the cached result and reset the cache.
+ return cachedPrefetcherDeferred.getAndUpdate((v) -> null);
+ }
return Deferred.fromResult(null);
} else if (tablet == null) {
Callback<Deferred<RowResultIterator>, AsyncKuduScanner.Response> cb =
@@ -578,7 +602,7 @@ public final class AsyncKuduScanner {
}
scannerId = resp.scannerId;
sequenceId++;
- hasMore = resp.more;
+ canRequestMore = resp.more;
if (LOG.isDebugEnabled()) {
LOG.debug("Scanner {} opened on {}", Bytes.pretty(scannerId),
tablet);
}
@@ -602,7 +626,7 @@ public final class AsyncKuduScanner {
// Stop scanning if the non-covered range is past the end
partition key.
if (!pruner.hasMorePartitionKeyRanges()) {
- hasMore = false;
+ canRequestMore = false;
closed = true; // the scanner is closed on the other side at
this point
return Deferred.fromResult(RowResultIterator.empty());
}
@@ -624,8 +648,9 @@ public final class AsyncKuduScanner {
// We need to open the scanner first.
return
client.sendRpcToTablet(getOpenRequest()).addCallbackDeferring(cb).addErrback(eb);
- } else if (prefetching && prefetcherDeferred != null) {
- // TODO KUDU-1260 - Check if this works and add a test
+ } else if (prefetching && cachedPrefetcherDeferred.get() != null) {
+ Deferred<RowResultIterator> prefetcherDeferred =
+ cachedPrefetcherDeferred.getAndUpdate((v) -> null);
prefetcherDeferred.chain(new
Deferred<RowResultIterator>().addCallback(prefetch));
return prefetcherDeferred;
}
@@ -641,9 +666,15 @@ public final class AsyncKuduScanner {
new Callback<RowResultIterator, RowResultIterator>() {
@Override
public RowResultIterator call(RowResultIterator arg) throws Exception {
- if (hasMoreRows()) {
- prefetcherDeferred = client.scanNextRows(AsyncKuduScanner.this)
- .addCallbacks(gotNextRow, nextRowErrback());
+ if (canRequestMore) {
+ if (cachedPrefetcherDeferred.get() == null) {
+ Deferred<RowResultIterator> prefetcherDeferred =
+ client.scanNextRows(AsyncKuduScanner.this)
+ .addCallbacks(gotNextRow, nextRowErrback());
+ if (!cachedPrefetcherDeferred.compareAndSet(null,
prefetcherDeferred)) {
+ LOG.info("Skip one prefetching because two concurrent prefetching
scan occurs");
+ }
+ }
}
return null;
}
@@ -664,7 +695,7 @@ public final class AsyncKuduScanner {
return resp.data;
}
sequenceId++;
- hasMore = resp.more;
+ canRequestMore = resp.more;
return resp.data;
}
@@ -710,7 +741,7 @@ public final class AsyncKuduScanner {
// Stop scanning if we have scanned until or past the end partition key, or
// if we have fulfilled the limit.
if (!pruner.hasMorePartitionKeyRanges() || numRowsReturned >= limit) {
- hasMore = false;
+ canRequestMore = false;
closed = true; // the scanner is closed on the other side at this point
return;
}
@@ -853,6 +884,10 @@ public final class AsyncKuduScanner {
*/
public Deferred<Void> keepAlive() {
if (closed) {
+ if (prefetching && cachedPrefetcherDeferred.get() != null) {
+ // skip sending keep alive if all of the data has been fetched in
prefetching mode
+ return Deferred.fromResult(null);
+ }
throw new IllegalStateException("Scanner has already been closed");
}
return client.keepAlive(this);
@@ -1007,7 +1042,14 @@ public final class AsyncKuduScanner {
ScannerKeepAliveResponsePB.Builder builder =
ScannerKeepAliveResponsePB.newBuilder();
readProtobuf(callResponse.getPBMessage(), builder);
ScannerKeepAliveResponsePB resp = builder.build();
- TabletServerErrorPB error = resp.hasError() ? resp.getError() : null;
+ TabletServerErrorPB error = null;
+ if (resp.hasError()) {
+ if (canBeIgnored(resp.getError().getCode())) {
+ LOG.info("Ignore false alert of scanner not found for keep alive
request");
+ } else {
+ error = resp.getError();
+ }
+ }
return new Pair<>(null, error);
}
}
@@ -1156,25 +1198,30 @@ public final class AsyncKuduScanner {
// Error handling.
if (error != null) {
- switch (error.getCode()) {
- case TABLET_NOT_FOUND:
- case TABLET_NOT_RUNNING:
- if (state == State.OPENING || (state == State.NEXT &&
isFaultTolerant)) {
- // Doing this will trigger finding the new location.
- return new Pair<>(null, error);
- } else {
- Status statusIncomplete = Status.Incomplete("Cannot continue
scanning, " +
- "the tablet has moved and this isn't a fault tolerant scan");
- throw new NonRecoverableException(statusIncomplete);
- }
- case SCANNER_EXPIRED:
- if (isFaultTolerant) {
- Status status = Status.fromTabletServerErrorPB(error);
- throw new FaultTolerantScannerExpiredException(status);
- }
- // fall through
- default:
- break;
+ if (canBeIgnored(resp.getError().getCode())) {
+ LOG.info("Ignore false alert of scanner not found for scan request");
+ error = null;
+ } else {
+ switch (error.getCode()) {
+ case TABLET_NOT_FOUND:
+ case TABLET_NOT_RUNNING:
+ if (state == State.OPENING || (state == State.NEXT &&
isFaultTolerant)) {
+ // Doing this will trigger finding the new location.
+ return new Pair<>(null, error);
+ } else {
+ Status statusIncomplete = Status.Incomplete("Cannot continue
scanning, " +
+ "the tablet has moved and this isn't a fault tolerant
scan");
+ throw new NonRecoverableException(statusIncomplete);
+ }
+ case SCANNER_EXPIRED:
+ if (isFaultTolerant) {
+ Status status = Status.fromTabletServerErrorPB(error);
+ throw new FaultTolerantScannerExpiredException(status);
+ }
+ // fall through
+ default:
+ break;
+ }
}
}
// TODO: Find a clean way to plumb in reuseRowResult.
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScannerPrefetching.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScannerPrefetching.java
new file mode 100644
index 0000000..9383ba9
--- /dev/null
+++
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScannerPrefetching.java
@@ -0,0 +1,307 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import static
org.apache.kudu.client.AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS;
+import static org.apache.kudu.test.ClientTestUtil.countRowsInScan;
+import static org.apache.kudu.test.ClientTestUtil.createBasicSchemaInsert;
+import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
+import static org.apache.kudu.test.KuduTestHarness.DEFAULT_SLEEP;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.test.KuduTestHarness;
+
+/**
+ * KUDU-1260: Test Kudu scanner prefetching
+ *
+ */
+public class TestKuduScannerPrefetching {
+ private static final Logger LOG = LoggerFactory.getLogger(ITClient.class);
+
+ private static final String RUNTIME_PROPERTY_NAME =
"scannerwithprefetching.runtime.seconds";
+ private static final long DEFAULT_RUNTIME_SECONDS = 60;
+
+ // Time we'll spend waiting at the end of the test for things to settle. Also
+ // the minimum this test can run for.
+ private static final long TEST_MIN_RUNTIME_SECONDS = 2;
+
+ private static final long TEST_TIMEOUT_SECONDS = 600000;
+
+ private static final String TABLE_NAME =
+ TestKuduScannerPrefetching.class.getName() + "-" +
System.currentTimeMillis();
+
+ // Tracks whether it's time for the test to end or not.
+ private CountDownLatch keepRunningLatch;
+
+ // If the test fails, will contain an exception that describes the failure.
+ private Exception failureException;
+
+ private KuduTable table;
+ private long runtimeInSeconds;
+
+ private volatile long sharedWriteTimestamp;
+
+ @Rule
+ public KuduTestHarness harness = new KuduTestHarness();
+
+ @Before
+ public void setUp() throws Exception {
+ // Set (or reset, in the event of a retry) test state.
+ keepRunningLatch = new CountDownLatch(1);
+ failureException = null;
+ sharedWriteTimestamp = 0;
+
+ // Extract and verify the test's running time.
+ String runtimeProp = System.getProperty(RUNTIME_PROPERTY_NAME);
+ runtimeInSeconds = runtimeProp == null ? DEFAULT_RUNTIME_SECONDS :
Long.parseLong(runtimeProp);
+ if (runtimeInSeconds < TEST_MIN_RUNTIME_SECONDS || runtimeInSeconds >
TEST_TIMEOUT_SECONDS) {
+ Assert.fail("This test needs to run more than " +
TEST_MIN_RUNTIME_SECONDS + " seconds" +
+ " and less than " + TEST_TIMEOUT_SECONDS + " seconds");
+ }
+ LOG.info("Test will run for {} seconds", runtimeInSeconds);
+
+ // Create the test table.
+ CreateTableOptions builder = new CreateTableOptions().setNumReplicas(3);
+ builder.setRangePartitionColumns(ImmutableList.of("key"));
+ table = harness.getClient().createTable(TABLE_NAME, getBasicSchema(),
builder);
+ }
+
+ /**
+ * Check the scan results for two scanners w/o prefetching
+ *
+ * @throws Exception
+ */
+ @Test(timeout = TEST_TIMEOUT_SECONDS)
+ public void testWithPrefetching() throws Exception {
+ List<Thread> threads = new ArrayList<>();
+ TestKuduScannerPrefetching.WriterThread wt = new
TestKuduScannerPrefetching.WriterThread();
+ TestKuduScannerPrefetching.ScannerThread st = new
TestKuduScannerPrefetching.ScannerThread();
+ threads.add(new Thread(wt, "writer-test-thread"));
+ threads.add(new Thread(st, "scanner-test-thread"));
+ for (Thread thread : threads) {
+ thread.setUncaughtExceptionHandler(new
TestKuduScannerPrefetching.UncaughtExceptionHandler());
+ thread.start();
+ }
+
+ // If we time out here, the test ran to completion and passed. Otherwise, a
+ // count down was triggered from an error and the test failed.
+ boolean failure = keepRunningLatch.await(runtimeInSeconds,
TimeUnit.SECONDS);
+ if (!failure) {
+ // The test passed but the threads are still running; tell them to stop.
+ keepRunningLatch.countDown();
+ }
+
+ for (Thread thread : threads) {
+ // Give plenty of time for threads to stop.
+ thread.join(DEFAULT_SLEEP);
+ }
+
+ if (failure) {
+ throw failureException;
+ }
+ Assert.assertTrue(wt.currentRowKey + " should be higher than 0",
wt.currentRowKey > 0);
+ Assert.assertTrue(st.totalRowCount + " should be higher than 0",
st.totalRowCount > 0);
+ }
+
+ /**
+ * Logs an error message and triggers the count down latch, stopping this
test.
+ *
+ * @param message error message to print
+ * @param exception optional exception to print
+ */
+ private void reportError(String message, Exception exception) {
+ failureException = new Exception(message, exception);
+ keepRunningLatch.countDown();
+ }
+
+ /**
+ * Thread that writes sequentially to the table. Every 10 rows it considers
setting the flush mode
+ * to MANUAL_FLUSH or AUTO_FLUSH_SYNC.
+ */
+ class WriterThread implements Runnable {
+
+ private final KuduSession session = harness.getClient().newSession();
+ private int currentRowKey = 0;
+
+ @Override
+ public void run() {
+
session.setExternalConsistencyMode(ExternalConsistencyMode.CLIENT_PROPAGATED);
+ while (keepRunningLatch.getCount() > 0) {
+ try {
+ OperationResponse resp =
session.apply(createBasicSchemaInsert(table, currentRowKey));
+ if (hasRowErrorAndReport(resp)) {
+ return;
+ }
+ currentRowKey++;
+ } catch (Exception e) {
+ if (keepRunningLatch.getCount() == 0) {
+ // Likely shutdown() related.
+ LOG.error("Error occurs: " + e.getMessage());
+ return;
+ }
+ reportError("Got error while inserting row " + currentRowKey, e);
+ return;
+ }
+ }
+ LOG.info("Stop writing");
+ }
+
+ private boolean hasRowErrorAndReport(OperationResponse resp) {
+ if (resp != null && resp.hasRowError()) {
+ reportError("The following RPC " + resp.getOperation().getRow() +
+ " returned this error: " + resp.getRowError(), null);
+ return true;
+ }
+
+ if (resp == null) {
+ return false;
+ }
+
+ sharedWriteTimestamp = resp.getWriteTimestampRaw();
+ return false;
+ }
+ }
+
+ /**
+ * Thread that scans the table. Alternates randomly between random gets and
full table scans.
+ */
+ class ScannerThread implements Runnable {
+ // Updated by calling a full scan.
+ private int totalRowCount = 0;
+
+ @Override
+ public void run() {
+ while (keepRunningLatch.getCount() > 0) {
+ boolean shouldContinue = true;
+ if (sharedWriteTimestamp == 0) {
+ shouldContinue = true;
+ } else {
+ shouldContinue = fullScan();
+ }
+ if (!shouldContinue) {
+ return;
+ }
+
+ if (totalRowCount == 0) {
+ try {
+ keepRunningLatch.await(50, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // Test is stopping.
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Runs a full table scan and verify the results
+ *
+ * @return true if the full scan was successful, false if there was an
error
+ */
+ private boolean fullScan() {
+ int rowCount;
+ int rowCount2;
+ TimeoutTracker timeoutTracker = new TimeoutTracker();
+ timeoutTracker.setTimeout(DEFAULT_SLEEP);
+
+ while (keepRunningLatch.getCount() > 0 && !timeoutTracker.timedOut()) {
+ long snapshot = sharedWriteTimestamp;
+ KuduScanner scanner =
getSnapshotScannerBuilder(snapshot).prefetching(true).build();
+ KuduScanner scannerNoPrefetching =
+ getSnapshotScannerBuilder(snapshot).prefetching(false).build();
+ try {
+ rowCount = countRowsInScan(scanner);
+ rowCount2 = countRowsInScan(scannerNoPrefetching);
+ } catch (KuduException e) {
+ return checkAndReportError("Got error while row counting", e);
+ }
+ Assert.assertEquals(rowCount, rowCount2);
+ totalRowCount += rowCount;
+ // Due to the lack of KUDU-430, we need to loop for a while.
+ try {
+ keepRunningLatch.await(50, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // No need to do anything, we'll exit the loop once we test
getCount() in the condition.
+ }
+ }
+ return !timeoutTracker.timedOut();
+ }
+
+ private KuduScanner.KuduScannerBuilder getSnapshotScannerBuilder(long
snapshot) {
+ return harness.getClient().newScannerBuilder(table)
+ .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
+ .snapshotTimestampRaw(snapshot)
+ .batchSizeBytes(128)
+ .keepAlivePeriodMs(DEFAULT_KEEP_ALIVE_PERIOD_MS)
+ .setFaultTolerant(false);
+ }
+
+ /**
+ * Checks the passed exception contains "Scanner not found". If it does
then it returns true,
+ * else it reports the error and returns false.
+ * We need to do this because the scans in this client aren't fault
tolerant.
+ *
+ * @param message message to print if the exception contains a real error
+ * @param e the exception to check
+ * @return true if the scanner failed on a non-FATAL error, otherwise
false which will kill
+ * this test
+ */
+ private boolean checkAndReportError(String message, KuduException e) {
+ // It's possible to get timeouts if we're unlucky. A particularly common
one is
+ // "could not wait for desired snapshot timestamp to be consistent"
since we're using
+ // READ_AT_SNAPSHOT scanners.
+ // TODO revisit once KUDU-1656 is taken care of.
+ if (e.getStatus().isTimedOut()) {
+ LOG.warn("Received a scan timeout", e);
+ return true;
+ }
+ // Do nasty things, expect nasty results. The scanners are a bit too
happy to retry TS
+ // disconnections so we might end up retrying a scanner on a node that
restarted, or we might
+ // get disconnected just after sending an RPC so when we reconnect to
the same TS we might get
+ // the "Invalid call sequence ID" message.
+ if (!e.getStatus().isNotFound() &&
+ !e.getStatus().getMessage().contains("Invalid call sequence
ID")) {
+ reportError(message, e);
+ return false;
+ }
+ return true;
+ }
+ }
+
+ private class UncaughtExceptionHandler implements
Thread.UncaughtExceptionHandler {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ // Only report an error if we're still running, else we'll spam the log.
+ if (keepRunningLatch.getCount() != 0) {
+ reportError("Uncaught exception", new Exception(e));
+ }
+ }
+ }
+}