This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 5735f07 [java] Make the KuduScanner iterable
5735f07 is described below
commit 5735f07f8b6aaf5134c66863519052936aa7487f
Author: Grant Henke <[email protected]>
AuthorDate: Mon Mar 11 11:31:53 2019 -0500
[java] Make the KuduScanner iterable
This patch makes the KuduScanner iterable to simplify
the API for processing the RowResults. It also pushes
the keep alive calls down into the KuduScannerIterator
so that any user of the KuduClient can automatically
leverage the API the same way the Spark integration does.
The existing RowResultIterator implementation reuses
the same RowResult object for all rows in the batch.
This can cause unexpected behavior when storing
RowResults from the scanner. This patch changes
the RowResultIterator to instead create a RowResult
object for every row, while still sharing the underlying
data.
I have replaced as many places where the scanner
is iterated as I can find with this new implementation.
The Spark implementations were replaced as well, but
a special hasNext method with a callback was required.
Change-Id: I3e4ac59e30d0562c0a381d5e304af1dcfdcf5a1a
Reviewed-on: http://gerrit.cloudera.org:8080/12715
Reviewed-by: Adar Dembo <[email protected]>
Tested-by: Kudu Jenkins
---
.../org/apache/kudu/backup/KuduBackupOptions.scala | 2 +-
.../org/apache/kudu/backup/KuduBackupRDD.scala | 44 ++----
.../kudu/client/AbstractKuduScannerBuilder.java | 13 ++
.../org/apache/kudu/client/AsyncKuduClient.java | 1 +
.../org/apache/kudu/client/AsyncKuduScanner.java | 35 ++++-
.../java/org/apache/kudu/client/KuduScanToken.java | 5 +
.../java/org/apache/kudu/client/KuduScanner.java | 28 +++-
.../apache/kudu/client/KuduScannerIterator.java | 100 +++++++++++++
.../java/org/apache/kudu/client/RowResult.java | 29 ++--
.../org/apache/kudu/client/RowResultIterator.java | 31 ++--
.../src/main/java/org/apache/kudu/util/Slice.java | 3 +
.../test/java/org/apache/kudu/client/ITClient.java | 12 +-
.../kudu/client/TestFlexiblePartitioning.java | 6 +-
.../org/apache/kudu/client/TestKuduScanner.java | 156 +++++++++++++++++++++
.../kudu/mapreduce/KuduTableInputFormat.java | 41 +-----
.../scala/org/apache/kudu/spark/kudu/KuduRDD.scala | 45 ++----
.../apache/kudu/spark/kudu/KuduReadOptions.scala | 4 +-
.../java/org/apache/kudu/test/ClientTestUtil.java | 7 +-
src/kudu/client/client.proto | 6 +-
19 files changed, 412 insertions(+), 156 deletions(-)
diff --git
a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
index d366c7c..a1d7519 100644
---
a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
+++
b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupOptions.scala
@@ -43,7 +43,7 @@ object KuduBackupOptions {
AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS // 30 seconds
val DefaultScanPrefetching
: Boolean = false // TODO: Add a test per KUDU-1260 and enable by default?
- val defaultKeepAlivePeriodMs: Long = 15000 // 25% of the default scanner ttl.
+ val defaultKeepAlivePeriodMs: Long =
AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS
// TODO: clean up usage output.
// TODO: timeout configurations.
diff --git
a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
index a5699d8..55f7f98 100644
--- a/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
+++ b/java/kudu-backup/src/main/scala/org/apache/kudu/backup/KuduBackupRDD.scala
@@ -18,8 +18,8 @@ package org.apache.kudu.backup
import java.util.concurrent.TimeUnit
-import org.apache.kudu.Type
import org.apache.kudu.client.AsyncKuduScanner.ReadMode
+import org.apache.kudu.client.KuduScannerIterator.NextRowsCallback
import org.apache.kudu.client._
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.kudu.util.HybridTimeUtil
@@ -67,6 +67,7 @@ class KuduBackupRDD private[kudu] (
.batchSizeBytes(options.scanBatchSize)
.scanRequestTimeout(options.scanRequestTimeoutMs)
.prefetching(options.scanPrefetching)
+ .keepAlivePeriodMs(options.keepAlivePeriodMs)
.build()
tokens.asScala.zipWithIndex.map {
@@ -87,7 +88,9 @@ class KuduBackupRDD private[kudu] (
// TODO: Get deletes and updates for incremental backups.
val scanner =
KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
- new RowIterator(scanner, kuduContext, keepAlivePeriodMs)
+ // We don't store the RowResult so we can enable the reuseRowResult
optimization.
+ scanner.setReuseRowResult(true)
+ new RowIterator(scanner, kuduContext)
}
override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -106,44 +109,27 @@ private case class KuduBackupPartition(index: Int,
scanToken: Array[Byte], locat
* that takes the job partitions and task context and expects to return an
Iterator[Row].
* This implementation facilitates that.
*/
-private class RowIterator(
- private val scanner: KuduScanner,
- val kuduContext: KuduContext,
- val keepAlivePeriodMs: Long)
+private class RowIterator(private val scanner: KuduScanner, val kuduContext:
KuduContext)
extends Iterator[Row] {
- private var currentIterator: RowResultIterator = RowResultIterator.empty
- private var lastKeepAliveTimeMs = System.currentTimeMillis()
-
- /**
- * Calls the keepAlive API on the current scanner if the keepAlivePeriodMs
has passed.
- */
- private def KeepKuduScannerAlive(): Unit = {
- val now = System.currentTimeMillis
- if (now >= lastKeepAliveTimeMs + keepAlivePeriodMs && !scanner.isClosed) {
- scanner.keepAlive()
- lastKeepAliveTimeMs = now
+ private val scannerIterator = scanner.iterator()
+ private val nextRowsCallback = new NextRowsCallback {
+ override def call(numRows: Int): Unit = {
+ if (TaskContext.get().isInterrupted()) {
+ throw new RuntimeException("Kudu task interrupted")
+ }
+
kuduContext.timestampAccumulator.add(kuduContext.syncClient.getLastPropagatedTimestamp)
}
}
override def hasNext: Boolean = {
- while (!currentIterator.hasNext && scanner.hasMoreRows) {
- if (TaskContext.get().isInterrupted()) {
- throw new RuntimeException("KuduBackup spark task interrupted")
- }
- currentIterator = scanner.nextRows()
- }
- // Update timestampAccumulator with the client's last propagated
- // timestamp on each executor.
-
kuduContext.timestampAccumulator.add(kuduContext.syncClient.getLastPropagatedTimestamp)
- KeepKuduScannerAlive()
- currentIterator.hasNext
+ scannerIterator.hasNext(nextRowsCallback)
}
// TODO: There may be an old KuduRDD implementation where we did some
// sort of zero copy/object pool pattern for performance (we could use that
here).
override def next(): Row = {
- val rowResult = currentIterator.next()
+ val rowResult = scannerIterator.next()
val columnCount = rowResult.getColumnProjection.getColumnCount
val columns = Array.ofDim[Any](columnCount)
for (i <- 0 until columnCount) {
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
index 4485eec..10fe8cb 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
@@ -56,6 +56,7 @@ public abstract class AbstractKuduScannerBuilder
List<Integer> projectedColumnIndexes = null;
long scanRequestTimeout;
ReplicaSelection replicaSelection = ReplicaSelection.LEADER_ONLY;
+ long keepAlivePeriodMs = AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS;
AbstractKuduScannerBuilder(AsyncKuduClient client, KuduTable table) {
this.client = client;
@@ -359,5 +360,17 @@ public abstract class AbstractKuduScannerBuilder
return (S) this;
}
+ /**
+ * Set the period at which to send keep-alive requests to the tablet
+ * server to ensure that this scanner will not time out.
+ *
+ * @param keepAlivePeriodMs the keep alive period in milliseconds
+ * @return this instance
+ */
+ public S keepAlivePeriodMs(long keepAlivePeriodMs) {
+ this.keepAlivePeriodMs = keepAlivePeriodMs;
+ return (S) this;
+ }
+
public abstract T build();
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 3bdb56b..8f666ed 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -265,6 +265,7 @@ public class AsyncKuduClient implements AutoCloseable {
public static final byte[] EMPTY_ARRAY = new byte[0];
public static final long NO_TIMESTAMP = -1;
public static final long DEFAULT_OPERATION_TIMEOUT_MS = 30000;
+ public static final long DEFAULT_KEEP_ALIVE_PERIOD_MS = 15000; // 25% of the
default scanner ttl.
private static final long MAX_RPC_ATTEMPTS = 100;
/**
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 68a0982..aafe62b 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -205,10 +205,14 @@ public final class AsyncKuduScanner {
private final ReplicaSelection replicaSelection;
+ private final long keepAlivePeriodMs;
+
/////////////////////
// Runtime variables.
/////////////////////
+ private boolean reuseRowResult = false;
+
private boolean closed = false;
private boolean hasMore = true;
@@ -250,7 +254,7 @@ public final class AsyncKuduScanner {
boolean cacheBlocks, boolean prefetching,
byte[] startPrimaryKey, byte[] endPrimaryKey,
long htTimestamp, int batchSizeBytes, PartitionPruner
pruner,
- ReplicaSelection replicaSelection) {
+ ReplicaSelection replicaSelection, long keepAlivePeriodMs) {
checkArgument(batchSizeBytes >= 0, "Need non-negative number of bytes, " +
"got %s", batchSizeBytes);
checkArgument(limit > 0, "Need a strictly positive number for the limit, "
+
@@ -315,6 +319,7 @@ public final class AsyncKuduScanner {
}
this.replicaSelection = replicaSelection;
+ this.keepAlivePeriodMs = keepAlivePeriodMs;
// For READ_YOUR_WRITES scan mode, get the latest observed timestamp
// and store it. Always use this one as the propagated timestamp for
@@ -398,11 +403,30 @@ public final class AsyncKuduScanner {
return this.schema;
}
+ public long getKeepAlivePeriodMs() {
+ return keepAlivePeriodMs;
+ }
+
long getSnapshotTimestamp() {
return this.htTimestamp;
}
/**
+ * If set to true, the {@link RowResult} object returned by the {@link
RowResultIterator}
+ * will be reused with each call to {@link RowResultIterator#next()).
+ * This can be a useful optimization to reduce the number of objects created.
+ *
+ * Note: DO NOT use this if the RowResult is stored between calls to next().
+ * Enabling this optimization means that a call to next() mutates the
previously returned
+ * RowResult. Accessing the previously returned RowResult after a call to
next(), by storing all
+ * RowResults in a collection and accessing them later for example, will
lead to all of the
+ * stored RowResults being mutated as per the data in the last RowResult
returned.
+ */
+ public void setReuseRowResult(boolean reuseRowResult) {
+ this.reuseRowResult = reuseRowResult;
+ }
+
+ /**
* Scans a number of rows.
* <p>
* Once this method returns {@code null} once (which indicates that this
@@ -1037,8 +1061,9 @@ public final class AsyncKuduScanner {
break;
}
}
+ // TODO: Find a clean way to plumb in reuseRowResult.
RowResultIterator iterator = RowResultIterator.makeRowResultIterator(
- timeoutTracker.getElapsedMillis(), tsUUID, schema, resp.getData(),
callResponse);
+ timeoutTracker.getElapsedMillis(), tsUUID, schema, resp.getData(),
callResponse, reuseRowResult);
boolean hasMore = resp.getHasMoreResults();
if (id.length != 0 && scannerId != null && !Bytes.equals(scannerId, id))
{
@@ -1094,9 +1119,9 @@ public final class AsyncKuduScanner {
public AsyncKuduScanner build() {
return new AsyncKuduScanner(
client, table, projectedColumnNames, projectedColumnIndexes,
readMode, isFaultTolerant,
- scanRequestTimeout, predicates, limit, cacheBlocks,
- prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
- htTimestamp, batchSizeBytes, PartitionPruner.create(this),
replicaSelection);
+ scanRequestTimeout, predicates, limit, cacheBlocks, prefetching,
lowerBoundPrimaryKey,
+ upperBoundPrimaryKey, htTimestamp, batchSizeBytes,
PartitionPruner.create(this),
+ replicaSelection, keepAlivePeriodMs);
}
}
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index 7e0e2d9..32618d3 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -261,6 +261,10 @@ public class KuduScanToken implements
Comparable<KuduScanToken> {
builder.scanRequestTimeout(message.getScanRequestTimeoutMs());
}
+ if (message.hasKeepAlivePeriodMs()) {
+ builder.keepAlivePeriodMs(message.getKeepAlivePeriodMs());
+ }
+
return builder.build();
}
@@ -372,6 +376,7 @@ public class KuduScanToken implements
Comparable<KuduScanToken> {
proto.setFaultTolerant(isFaultTolerant);
proto.setBatchSizeBytes(batchSizeBytes);
proto.setScanRequestTimeoutMs(scanRequestTimeout);
+ proto.setKeepAlivePeriodMs(keepAlivePeriodMs);
try {
PartitionPruner pruner = PartitionPruner.create(this);
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index f945d8f..3caeeb1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -28,7 +28,7 @@ import org.apache.kudu.client.AsyncKuduScanner.ReadMode;
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public class KuduScanner {
+public class KuduScanner implements Iterable<RowResult> {
private final AsyncKuduScanner asyncScanner;
@@ -45,6 +45,21 @@ public class KuduScanner {
}
/**
+ * If set to true, the {@link RowResult} object returned by the {@link
RowResultIterator}
+ * will be reused with each call to {@link RowResultIterator#next()).
+ * This can be a useful optimization to reduce the number of objects created.
+ *
+ * Note: DO NOT use this if the RowResult is stored between calls to next().
+ * Enabling this optimization means that a call to next() mutates the
previously returned
+ * RowResult. Accessing the previously returned RowResult after a call to
next(), by storing all
+ * RowResults in a collection and accessing them later for example, will
lead to all of the
+ * stored RowResults being mutated as per the data in the last RowResult
returned.
+ */
+ public void setReuseRowResult(boolean reuseRowResult) {
+ asyncScanner.setReuseRowResult(reuseRowResult);
+ }
+
+ /**
* Scans a number of rows.
* <p>
* Once this method returns {@code null} once (which indicates that this
@@ -164,6 +179,11 @@ public class KuduScanner {
return asyncScanner.getScanRequestTimeout();
}
+ @Override
+ public KuduScannerIterator iterator() {
+ return new KuduScannerIterator(this, asyncScanner.getKeepAlivePeriodMs());
+ }
+
/**
* A Builder class to build {@link KuduScanner}.
* Use {@link KuduClient#newScannerBuilder} in order to get a builder
instance.
@@ -185,9 +205,9 @@ public class KuduScanner {
public KuduScanner build() {
return new KuduScanner(new AsyncKuduScanner(
client, table, projectedColumnNames, projectedColumnIndexes,
readMode, isFaultTolerant,
- scanRequestTimeout, predicates, limit, cacheBlocks,
- prefetching, lowerBoundPrimaryKey, upperBoundPrimaryKey,
- htTimestamp, batchSizeBytes, PartitionPruner.create(this),
replicaSelection));
+ scanRequestTimeout, predicates, limit, cacheBlocks, prefetching,
lowerBoundPrimaryKey,
+ upperBoundPrimaryKey, htTimestamp, batchSizeBytes,
PartitionPruner.create(this),
+ replicaSelection, keepAlivePeriodMs));
}
}
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScannerIterator.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScannerIterator.java
new file mode 100644
index 0000000..9c3ce5e
--- /dev/null
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScannerIterator.java
@@ -0,0 +1,100 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import java.util.Iterator;
+
+/**
+ * An iterator for the RowResults of a KuduScanner.
+ * Exhausting this iterator means that all of the rows from a KuduScanner have
been read.
+ *
+ * This iterator also handles sending keep alive requests to ensure the scanner
+ * does not time out.
+ */
[email protected]
[email protected]
+public class KuduScannerIterator implements Iterator<RowResult> {
+
+ private final KuduScanner scanner;
+ private final long keepAlivePeriodMs;
+
+ private RowResultIterator currentIterator = RowResultIterator.empty();
+ private long lastKeepAliveTimeMs = System.currentTimeMillis();
+
+ KuduScannerIterator(KuduScanner scanner, long keepAlivePeriodMs) {
+ this.scanner = scanner;
+ this.keepAlivePeriodMs = keepAlivePeriodMs;
+ }
+
+ /**
+ * Calls the keepAlive API on the current scanner if the keepAlivePeriodMs
has passed.
+ */
+ private void KeepKuduScannerAlive() throws KuduException {
+ long now = System.currentTimeMillis();
+ if (now >= lastKeepAliveTimeMs + keepAlivePeriodMs && !scanner.isClosed())
{
+ scanner.keepAlive();
+ lastKeepAliveTimeMs = now;
+ }
+ }
+
+ /**
+ * Special implementation of hasNext that calls a callback each time
+ * {@link KuduScanner#nextRows} is called.
+ *
+ * @param nextRowsCallback the NextRowsCallback to call
+ * @return {@code true} if the iteration has more elements
+ */
+ @InterfaceAudience.LimitedPrivate("Spark")
+ public boolean hasNext(NextRowsCallback nextRowsCallback) {
+ try {
+ while (!currentIterator.hasNext() && scanner.hasMoreRows()) {
+ currentIterator = scanner.nextRows();
+ if (nextRowsCallback != null) {
+ nextRowsCallback.call(currentIterator.getNumRows());
+ }
+ }
+ KeepKuduScannerAlive();
+ return currentIterator.hasNext();
+ } catch (KuduException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return hasNext(null);
+ }
+
+ @Override
+ public RowResult next() {
+ return currentIterator.next();
+ }
+
+ @InterfaceAudience.LimitedPrivate("Spark")
+ public static abstract class NextRowsCallback {
+
+ /**
+ * @param numRows The number of rows returned from the
+ * {@link KuduScanner#nextRows} call.
+ */
+ public abstract void call(int numRows);
+ }
+}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
index 842ffdc..dd726aa 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
@@ -33,7 +33,7 @@ import org.apache.kudu.Type;
import org.apache.kudu.util.Slice;
/**
- * RowResult represents one row from a scanner. Do not reuse or store the
objects.
+ * RowResult represents one row from a scanner.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@@ -41,14 +41,15 @@ public class RowResult {
private static final int INDEX_RESET_LOCATION = -1;
+ private final Schema schema;
+ private final Slice indirectData;
+ private final int rowSize;
+ private final int[] columnOffsets;
+
+ private Slice rowData;
private int index = INDEX_RESET_LOCATION;
private int offset;
private BitSet nullsBitSet;
- private final int rowSize;
- private final int[] columnOffsets;
- private final Schema schema;
- private final Slice rowData;
- private final Slice indirectData;
/**
* Prepares the row representation using the provided data. Doesn't copy data
@@ -56,16 +57,17 @@ public class RowResult {
* @param schema Schema used to build the rowData
* @param rowData The Slice of data returned by the tablet server
* @param indirectData The full indirect data that contains the strings
+ * @param rowIndex The index of the row in the rowData that this RowResult
represents
*/
- RowResult(Schema schema, Slice rowData, Slice indirectData) {
+ RowResult(Schema schema, Slice rowData, Slice indirectData, int rowIndex) {
this.schema = schema;
this.rowData = rowData;
this.indirectData = indirectData;
+ this.rowSize = this.schema.getRowSize();
int columnOffsetsSize = schema.getColumnCount();
if (schema.hasNullableColumns()) {
columnOffsetsSize++;
}
- this.rowSize = this.schema.getRowSize();
columnOffsets = new int[columnOffsetsSize];
// Empty projection, usually used for quick row counting.
if (columnOffsetsSize == 0) {
@@ -81,19 +83,16 @@ public class RowResult {
columnOffsets[i] = previousSize + currentOffset;
currentOffset += previousSize;
}
- }
-
- /**
- * Package-protected, only meant to be used by the RowResultIterator
- */
- void advancePointer() {
- advancePointerTo(this.index + 1);
+ advancePointerTo(rowIndex);
}
void resetPointer() {
advancePointerTo(INDEX_RESET_LOCATION);
}
+ /**
+ * Package-protected, only meant to be used by the RowResultIterator
+ */
void advancePointerTo(int rowIndex) {
this.index = rowIndex;
this.offset = this.rowSize * this.index;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
index 3ad4155..aba2ba8 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
@@ -37,14 +37,14 @@ public class RowResultIterator extends KuduRpcResponse
implements Iterator<RowRe
Iterable<RowResult> {
private static final RowResultIterator EMPTY =
- new RowResultIterator(0, null, null, 0, null, null);
+ new RowResultIterator(0, null, null, 0, null, null, false);
private final Schema schema;
private final Slice bs;
private final Slice indirectBs;
private final int numRows;
- private final RowResult rowResult;
private int currentRow = 0;
+ private final RowResult sharedRowResult;
/**
* Package private constructor, only meant to be instantiated from
AsyncKuduScanner.
@@ -60,24 +60,26 @@ public class RowResultIterator extends KuduRpcResponse
implements Iterator<RowRe
Schema schema,
int numRows,
Slice bs,
- Slice indirectBs) {
+ Slice indirectBs,
+ boolean reuseRowResult) {
super(elapsedMillis, tsUUID);
this.schema = schema;
+ this.numRows = numRows;
this.bs = bs;
this.indirectBs = indirectBs;
- this.numRows = numRows;
-
- this.rowResult = numRows == 0 ? null : new RowResult(this.schema, this.bs,
this.indirectBs);
+ this.sharedRowResult = (reuseRowResult && numRows != 0) ?
+ new RowResult(this.schema, this.bs, this.indirectBs, -1) : null;
}
static RowResultIterator makeRowResultIterator(long elapsedMillis,
String tsUUID,
Schema schema,
WireProtocol.RowwiseRowBlockPB data,
- final CallResponse
callResponse)
+ final CallResponse
callResponse,
+ boolean reuseRowResult)
throws KuduException {
if (data == null || data.getNumRows() == 0) {
- return new RowResultIterator(elapsedMillis, tsUUID, schema, 0, null,
null);
+ return new RowResultIterator(elapsedMillis, tsUUID, schema, 0, null,
null, reuseRowResult);
}
Slice bs = callResponse.getSidecar(data.getRowsSidecar());
@@ -92,7 +94,7 @@ public class RowResultIterator extends KuduRpcResponse
implements Iterator<RowRe
" bytes of data but expected " + expectedSize + " for " + numRows +
" rows");
throw new NonRecoverableException(statusIllegalState);
}
- return new RowResultIterator(elapsedMillis, tsUUID, schema, numRows, bs,
indirectBs);
+ return new RowResultIterator(elapsedMillis, tsUUID, schema, numRows, bs,
indirectBs, reuseRowResult);
}
/**
@@ -109,10 +111,13 @@ public class RowResultIterator extends KuduRpcResponse
implements Iterator<RowRe
@Override
public RowResult next() {
- // The rowResult keeps track of where it is internally
- this.rowResult.advancePointer();
- this.currentRow++;
- return rowResult;
+ // If sharedRowResult is not null, we should reuse it for every next call.
+ if (sharedRowResult != null) {
+ this.sharedRowResult.advancePointerTo(this.currentRow++);
+ return sharedRowResult;
+ } else {
+ return new RowResult(this.schema, this.bs, this.indirectBs,
this.currentRow++);
+ }
}
@Override
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/util/Slice.java
b/java/kudu-client/src/main/java/org/apache/kudu/util/Slice.java
index 6fc8375..045b225 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/Slice.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/Slice.java
@@ -40,6 +40,9 @@ import org.apache.yetus.audience.InterfaceAudience;
/**
* Little Endian slice of a byte array.
+ *
+ * The slice holds on to a reference of the underlying byte array meaning it
+ * cannot be garbage collected until the Slice itself can be garbage collected.
*/
@InterfaceAudience.Private
public final class Slice implements Comparable<Slice> {
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
index 5998a27..48ddec5 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
@@ -343,17 +343,9 @@ public class ITClient {
.build();
List<RowResult> results = new ArrayList<>();
- while (scanner.hasMoreRows()) {
- try {
- RowResultIterator ite = scanner.nextRows();
- for (RowResult row : ite) {
- results.add(row);
- }
- } catch (KuduException e) {
- return checkAndReportError("Got error while getting row " + key, e);
- }
+ for (RowResult row : scanner) {
+ results.add(row);
}
-
if (results.isEmpty() || results.size() > 1) {
reportError("Random get got 0 or many rows " + results.size() + " for
key " + key, null);
return false;
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestFlexiblePartitioning.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestFlexiblePartitioning.java
index 5931d01..4dddd4d 100644
---
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestFlexiblePartitioning.java
+++
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestFlexiblePartitioning.java
@@ -95,10 +95,8 @@ public class TestFlexiblePartitioning {
private Set<Row> collectRows(KuduScanner scanner) throws KuduException {
Set<Row> rows = new HashSet<>();
- while (scanner.hasMoreRows()) {
- for (RowResult result : scanner.nextRows()) {
- rows.add(Row.fromResult(result));
- }
+ for (RowResult result : scanner) {
+ rows.add(Row.fromResult(result));
}
return rows;
}
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
new file mode 100644
index 0000000..57a44be
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.kudu.client;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.test.ClientTestUtil;
+import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.RandomUtils;
+import org.apache.kudu.util.DataGenerator;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+public class TestKuduScanner {
+ private static final String tableName = "TestKuduScanner";
+
+ private static final Schema basicSchema = ClientTestUtil.getBasicSchema();
+
+ private KuduClient client;
+
+ @Rule
+ public KuduTestHarness harness = new KuduTestHarness();
+
+ @Before
+ public void setUp() {
+ client = harness.getClient();
+ }
+
+ @Test(timeout = 100000)
+ public void testIterable() throws Exception {
+ KuduTable table = client.createTable(tableName, basicSchema,
getBasicCreateTableOptions());
+ DataGenerator generator = new DataGenerator.DataGeneratorBuilder()
+ .random(RandomUtils.getRandom())
+ .build();
+ KuduSession session = client.newSession();
+ List<Integer> insertKeys = new ArrayList<>();
+ int numRows = 10;
+ for (int i = 0; i < numRows; i++) {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ generator.randomizeRow(row);
+ insertKeys.add(row.getInt(0));
+ session.apply(insert);
+ }
+
+ // Ensure that when an enhanced for-loop is used, there's no sharing of
RowResult objects.
+ KuduScanner scanner = client.newScannerBuilder(table).build();
+ Set<RowResult> results = new HashSet<>();
+ Set<Integer> resultKeys = new HashSet<>();
+ for (RowResult rowResult : scanner) {
+ results.add(rowResult);
+ resultKeys.add(rowResult.getInt(0));
+ }
+ assertEquals(numRows, results.size());
+ assertTrue(resultKeys.containsAll(insertKeys));
+
+ // Ensure that when the reuseRowResult optimization is set, only a single
RowResult is used.
+ KuduScanner reuseScanner = client.newScannerBuilder(table).build();
+ reuseScanner.setReuseRowResult(true);
+ Set<RowResult> reuseResult = new HashSet<>();
+ for (RowResult rowResult : reuseScanner) {
+ reuseResult.add(rowResult);
+ }
+ // Ensure the same RowResult object is reused.
+ assertEquals(1, reuseResult.size());
+ }
+
+ @Test(timeout = 100000)
+ @KuduTestHarness.TabletServerConfig(flags = {
+ "--scanner_ttl_ms=5000",
+ "--scanner_gc_check_interval_us=500000"}) // 10% of the TTL.
+ public void testKeepAlive() throws Exception {
+ int rowCount = 500;
+ int shortScannerTtlMs = 5000;
+
+ // Create a simple table with a single partition.
+ Schema tableSchema = new Schema(Collections.singletonList(
+ new ColumnSchema.ColumnSchemaBuilder("key",
Type.INT32).key(true).build()
+ ));
+
+ CreateTableOptions tableOptions = new CreateTableOptions()
+ .setRangePartitionColumns(Collections.singletonList("key"))
+ .setNumReplicas(1);
+ KuduTable table = client.createTable(tableName, tableSchema, tableOptions);
+
+ KuduSession session = client.newSession();
+ for (int i = 0; i < rowCount; i++) {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addInt(0, i);
+ session.apply(insert);
+ }
+
+ // Test that a keepAlivePeriodMs less than the scanner ttl is successful.
+ KuduScanner goodScanner = client.newScannerBuilder(table)
+ .batchSizeBytes(100) // Set a small batch size so the first scan
doesn't read all the rows.
+ .keepAlivePeriodMs(shortScannerTtlMs / 4)
+ .build();
+ processKeepAliveScanner(goodScanner, shortScannerTtlMs);
+
+ // Test that a keepAlivePeriodMs greater than the scanner ttl fails.
+ KuduScanner badScanner = client.newScannerBuilder(table)
+ .batchSizeBytes(100) // Set a small batch size so the first scan
doesn't read all the rows.
+ .keepAlivePeriodMs(shortScannerTtlMs * 2)
+ .build();
+ try {
+ processKeepAliveScanner(badScanner, shortScannerTtlMs);
+ fail("Should throw a scanner not found exception");
+ } catch (RuntimeException ex) {
+ assertTrue(ex.getMessage().matches(".*Scanner .* not found.*"));
+ }
+ }
+
+ private void processKeepAliveScanner(KuduScanner scanner, int scannerTtlMs)
throws Exception {
+ int i = 0;
+ // Ensure reading takes longer than the scanner ttl.
+ for (RowResult rowResult : scanner) {
+ // Sleep for half the ttl for the first few rows. This ensures
+ // we are on the same tablet and will go past the ttl without
+ // a new scan request. It also ensures a single row doesn't go
+ // longer than the ttl.
+ if (i < 5) {
+ Thread.sleep(scannerTtlMs / 2); // Sleep for half the ttl.
+ i++;
+ }
+ }
+ }
+}
diff --git
a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
index c39e715..9857e18 100644
---
a/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
+++
b/java/kudu-mapreduce/src/main/java/org/apache/kudu/mapreduce/KuduTableInputFormat.java
@@ -15,16 +15,15 @@
package org.apache.kudu.mapreduce;
-import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.naming.NamingException;
@@ -48,7 +47,6 @@ import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.kudu.Common;
import org.apache.kudu.Schema;
import org.apache.kudu.client.AsyncKuduClient;
import org.apache.kudu.client.Bytes;
@@ -59,7 +57,6 @@ import org.apache.kudu.client.KuduScanner;
import org.apache.kudu.client.KuduTable;
import org.apache.kudu.client.LocatedTablet;
import org.apache.kudu.client.RowResult;
-import org.apache.kudu.client.RowResultIterator;
/**
* <p>
@@ -401,7 +398,7 @@ public class KuduTableInputFormat extends
InputFormat<NullWritable, RowResult>
private final NullWritable currentKey = NullWritable.get();
private RowResult currentValue;
- private RowResultIterator iterator;
+ private Iterator<RowResult> iterator;
private KuduScanner scanner;
private TableSplit split;
private KuduClient kuduClient;
@@ -418,48 +415,18 @@ public class KuduTableInputFormat extends
InputFormat<NullWritable, RowResult>
LOG.debug("Creating scanner for token: {}",
KuduScanToken.stringifySerializedToken(split.getScanToken(),
kuduClient));
scanner = KuduScanToken.deserializeIntoScanner(split.getScanToken(),
kuduClient);
-
- // Calling this now to set iterator.
- tryRefreshIterator();
+ iterator = scanner.iterator();
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!iterator.hasNext()) {
- tryRefreshIterator();
- if (!iterator.hasNext()) {
- // Means we still have the same iterator, we're done
- return false;
- }
+ return false;
}
currentValue = iterator.next();
return true;
}
- /**
- * If the scanner has more rows, get a new iterator else don't do anything.
- * @throws IOException
- */
- private void tryRefreshIterator() throws IOException {
- if (!scanner.hasMoreRows()) {
- return;
- }
- try {
- // scanner.nextRows() sometimes returns an empty RowResultIterator, but
- // scanner.hasMoreRows() returns still true, so we need to continue
- // iterating on the scanner until scanner.hasMoreRows() returns false.
- //
- // TODO (qqzhang) In future, the backend can guarantee that
- // TabletService.Scan() would not return the empty results, we need to
- // remove the loop.
- do {
- iterator = scanner.nextRows();
- } while (!iterator.hasNext() && scanner.hasMoreRows());
- } catch (Exception e) {
- throw new IOException("Couldn't get scan data", e);
- }
- }
-
@Override
public NullWritable getCurrentKey() throws IOException,
InterruptedException {
return currentKey;
diff --git
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
index 78670a4..92a8515 100644
--- a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
+++ b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduRDD.scala
@@ -17,7 +17,6 @@
package org.apache.kudu.spark.kudu
import scala.collection.JavaConverters._
-
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.Partition
@@ -26,10 +25,9 @@ import org.apache.spark.TaskContext
import org.apache.spark.util.LongAccumulator
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
-
-import org.apache.kudu.client._
-import org.apache.kudu.Type
import org.apache.kudu.client
+import org.apache.kudu.client._
+import org.apache.kudu.client.KuduScannerIterator.NextRowsCallback
/**
* A Resilient Distributed Dataset backed by a Kudu table.
@@ -60,6 +58,7 @@ class KuduRDD private[kudu] (
.batchSizeBytes(options.batchSize)
.setProjectedColumnNames(projectedCols.toSeq.asJava)
.setFaultTolerant(options.faultTolerantScanner)
+ .keepAlivePeriodMs(options.keepAlivePeriodMs)
.cacheBlocks(true)
// A scan is partitioned to multiple ones. If scan locality is enabled,
@@ -100,7 +99,9 @@ class KuduRDD private[kudu] (
val partition: KuduPartition = part.asInstanceOf[KuduPartition]
val scanner =
KuduScanToken.deserializeIntoScanner(partition.scanToken, client)
- new RowIterator(scanner, kuduContext, keepAlivePeriodMs, rowsRead)
+ // We don't store the RowResult so we can enable the reuseRowResult
optimization.
+ scanner.setReuseRowResult(true)
+ new RowIterator(scanner, kuduContext, rowsRead)
}
override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -121,47 +122,31 @@ private class KuduPartition(
* A Spark SQL [[Row]] iterator which wraps a [[KuduScanner]].
* @param scanner the wrapped scanner
* @param kuduContext the kudu context
- * @param keepAlivePeriodMs the period in which to call the keepAlive on the
scanners
* @param rowsRead an accumulator to track the number of rows read from Kudu
*/
private class RowIterator(
val scanner: KuduScanner,
val kuduContext: KuduContext,
- val keepAlivePeriodMs: Long,
val rowsRead: LongAccumulator)
extends Iterator[Row] {
- private var currentIterator: RowResultIterator = RowResultIterator.empty
- private var lastKeepAliveTimeMs = System.currentTimeMillis()
-
- /**
- * Calls the keepAlive API on the current scanner if the keepAlivePeriodMs
has passed.
- */
- private def KeepKuduScannerAlive(): Unit = {
- val now = System.currentTimeMillis
- if (now >= lastKeepAliveTimeMs + keepAlivePeriodMs && !scanner.isClosed) {
- scanner.keepAlive()
- lastKeepAliveTimeMs = now
- }
- }
-
- override def hasNext: Boolean = {
- while (!currentIterator.hasNext && scanner.hasMoreRows) {
+ private val scannerIterator = scanner.iterator()
+ private val nextRowsCallback = new NextRowsCallback {
+ override def call(numRows: Int): Unit = {
if (TaskContext.get().isInterrupted()) {
throw new RuntimeException("Kudu task interrupted")
}
- currentIterator = scanner.nextRows()
- // Update timestampAccumulator with the client's last propagated
- // timestamp on each executor.
kuduContext.timestampAccumulator.add(kuduContext.syncClient.getLastPropagatedTimestamp)
- rowsRead.add(currentIterator.getNumRows)
+ rowsRead.add(numRows)
}
- KeepKuduScannerAlive()
- currentIterator.hasNext
+ }
+
+ override def hasNext: Boolean = {
+ scannerIterator.hasNext(nextRowsCallback)
}
override def next(): Row = {
- val rowResult = currentIterator.next()
+ val rowResult = scannerIterator.next()
val columnCount = rowResult.getColumnProjection.getColumnCount
val columns = Array.ofDim[Any](columnCount)
for (i <- 0 until columnCount) {
diff --git
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
index 82c3af8..afa0b2f 100644
---
a/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
+++
b/java/kudu-spark/src/main/scala/org/apache/kudu/spark/kudu/KuduReadOptions.scala
@@ -19,7 +19,7 @@ package org.apache.kudu.spark.kudu
import org.apache.yetus.audience.InterfaceAudience
import org.apache.yetus.audience.InterfaceStability
-
+import org.apache.kudu.client.AsyncKuduClient
import org.apache.kudu.client.ReplicaSelection
import org.apache.kudu.spark.kudu.KuduReadOptions._
@@ -50,5 +50,5 @@ object KuduReadOptions {
val defaultBatchSize: Int = 1024 * 1024 * 20 // TODO: Understand/doc this
setting?
val defaultScanLocality: ReplicaSelection = ReplicaSelection.CLOSEST_REPLICA
val defaultFaultTolerantScanner: Boolean = false
- val defaultKeepAlivePeriodMs: Long = 15000 // 25% of the default scanner ttl.
+ val defaultKeepAlivePeriodMs: Long =
AsyncKuduClient.DEFAULT_KEEP_ALIVE_PERIOD_MS
}
diff --git
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
index a93ee83..38ffff7 100644
---
a/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
+++
b/java/kudu-test-utils/src/main/java/org/apache/kudu/test/ClientTestUtil.java
@@ -184,11 +184,8 @@ public abstract class ClientTestUtil {
scanBuilder.addPredicate(predicate);
}
KuduScanner scanner = scanBuilder.build();
- while (scanner.hasMoreRows()) {
- RowResultIterator rows = scanner.nextRows();
- for (RowResult r : rows) {
- rowStrings.add(r.rowToString());
- }
+ for (RowResult r : scanner) {
+ rowStrings.add(r.rowToString());
}
Collections.sort(rowStrings);
return rowStrings;
diff --git a/src/kudu/client/client.proto b/src/kudu/client/client.proto
index 140c430..edb3f00 100644
--- a/src/kudu/client/client.proto
+++ b/src/kudu/client/client.proto
@@ -77,7 +77,7 @@ message ScanTokenPB {
// See common.proto for further information about read modes.
optional ReadMode read_mode = 10 [default = READ_LATEST];
- // The requested snapshot timestamp. This is only used
+ // The requested snapshot timestamp. This is only used O
// when the read mode is set to READ_AT_SNAPSHOT.
optional fixed64 snap_timestamp = 11;
@@ -106,6 +106,10 @@ message ScanTokenPB {
// server can take. If not set, the default value is controlled by the client
// scanner implementation.
optional int64 scan_request_timeout_ms = 17;
+
+ // The period, in milliseconds, at which to send keep-alive requests to the
tablet
+ // server to ensure this scanner won't time out.
+ optional int64 keep_alive_period_ms = 18;
}
// All of the data necessary to authenticate to a cluster from a client with