This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 5e78e4a [java] Add private diff scan support
5e78e4a is described below
commit 5e78e4a5c0a100a6f9e0c0d6cb84450a4abfada1
Author: Grant Henke <[email protected]>
AuthorDate: Fri Mar 1 08:46:55 2019 -0600
[java] Add private diff scan support
Adds a private diff scan API to the java client. This is
achieved primarily by adding a
`diffScan(long startTimestamp, long endTimestamp)`
method to the AbstractKuduScannerBuilder builder,
and then configuring the scan RPC to include the set fields.
The column projection of the scan will include an extra
`IS_DELETED` column when a diff scan is performed.
The value of the `IS_DELETED` virtual column can be
retrieved via `RowResult.isDeleted()`
Change-Id: I51b01ae76cd22407df9097b56629ac7262ec2964
Reviewed-on: http://gerrit.cloudera.org:8080/12689
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <[email protected]>
Reviewed-by: Mike Percy <[email protected]>
---
.../main/java/org/apache/kudu/ColumnSchema.java | 58 ++++-
.../src/main/java/org/apache/kudu/Schema.java | 26 ++-
.../src/main/java/org/apache/kudu/Type.java | 3 +-
.../kudu/client/AbstractKuduScannerBuilder.java | 20 ++
.../org/apache/kudu/client/AsyncKuduScanner.java | 84 +++++--
.../java/org/apache/kudu/client/KuduScanToken.java | 16 +-
.../java/org/apache/kudu/client/KuduScanner.java | 4 +-
.../java/org/apache/kudu/client/Operation.java | 4 +-
.../java/org/apache/kudu/client/PartialRow.java | 3 +-
.../org/apache/kudu/client/ProtobufHelper.java | 3 +-
.../java/org/apache/kudu/client/RowResult.java | 57 +++--
.../java/org/apache/kudu/util/DataGenerator.java | 12 +
.../java/org/apache/kudu/util/TimestampUtil.java | 11 +
.../org/apache/kudu/client/TestKuduScanner.java | 255 ++++++++++++++++++++-
src/kudu/client/client.proto | 8 +-
15 files changed, 509 insertions(+), 55 deletions(-)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
index 212ce78..1b6846e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/ColumnSchema.java
@@ -43,6 +43,7 @@ public class ColumnSchema {
private final CompressionAlgorithm compressionAlgorithm;
private final ColumnTypeAttributes typeAttributes;
private final int typeSize;
+ private final Common.DataType wireType;
/**
* Specifies the encoding of data for a column on disk.
@@ -100,7 +101,8 @@ public class ColumnSchema {
private ColumnSchema(String name, Type type, boolean key, boolean nullable,
Object defaultValue, int desiredBlockSize, Encoding
encoding,
- CompressionAlgorithm compressionAlgorithm,
ColumnTypeAttributes typeAttributes) {
+ CompressionAlgorithm compressionAlgorithm,
+ ColumnTypeAttributes typeAttributes, Common.DataType
wireType) {
this.name = name;
this.type = type;
this.key = key;
@@ -111,6 +113,7 @@ public class ColumnSchema {
this.compressionAlgorithm = compressionAlgorithm;
this.typeAttributes = typeAttributes;
this.typeSize = type.getSize(typeAttributes);
+ this.wireType = wireType;
}
/**
@@ -186,6 +189,14 @@ public class ColumnSchema {
}
/**
+ * Get the column's underlying DataType.
+ */
+ @InterfaceAudience.Private
+ public Common.DataType getWireType() {
+ return wireType;
+ }
+
+ /**
* The size of this type in bytes on the wire.
* @return A size
*/
@@ -233,10 +244,11 @@ public class ColumnSchema {
private boolean key = false;
private boolean nullable = false;
private Object defaultValue = null;
- private int blockSize = 0;
+ private int desiredBlockSize = 0;
private Encoding encoding = null;
private CompressionAlgorithm compressionAlgorithm = null;
private ColumnTypeAttributes typeAttributes = null;
+ private Common.DataType wireType = null;
/**
* Constructor for the required parameters.
@@ -249,6 +261,23 @@ public class ColumnSchema {
}
/**
+ * Constructor to copy an existing columnSchema
+ * @param that the columnSchema to copy
+ */
+ public ColumnSchemaBuilder(ColumnSchema that) {
+ this.name = that.name;
+ this.type = that.type;
+ this.key = that.key;
+ this.nullable = that.nullable;
+ this.defaultValue = that.defaultValue;
+ this.desiredBlockSize = that.desiredBlockSize;
+ this.encoding = that.encoding;
+ this.compressionAlgorithm = that.compressionAlgorithm;
+ this.typeAttributes = that.typeAttributes;
+ this.wireType = that.wireType;
+ }
+
+ /**
* Sets if the column is part of the row key. False by default.
* @param key a boolean that indicates if the column is part of the key
* @return this instance
@@ -300,12 +329,12 @@ public class ColumnSchema {
*
* It's recommended that this not be set any lower than 4096 (4KB) or
higher
* than 1048576 (1MB).
- * @param blockSize the desired block size, in bytes
+ * @param desiredBlockSize the desired block size, in bytes
* @return this instance
* <!-- TODO(KUDU-1107): move the above info to docs -->
*/
- public ColumnSchemaBuilder desiredBlockSize(int blockSize) {
- this.blockSize = blockSize;
+ public ColumnSchemaBuilder desiredBlockSize(int desiredBlockSize) {
+ this.desiredBlockSize = desiredBlockSize;
return this;
}
@@ -340,13 +369,30 @@ public class ColumnSchema {
}
/**
+ * Allows an alternate {@link Common.DataType} to override the {@link Type}
+ * when serializing the ColumnSchema on the wire.
+ * This is useful for virtual columns specified by their type such as
+ * {@link Common.DataType#IS_DELETED}.
+ */
+ @InterfaceAudience.Private
+ public ColumnSchemaBuilder wireType(Common.DataType wireType) {
+ this.wireType = wireType;
+ return this;
+ }
+
+ /**
* Builds a {@link ColumnSchema} using the passed parameters.
* @return a new {@link ColumnSchema}
*/
public ColumnSchema build() {
+ // Set the wire type if it wasn't explicitly set.
+ if (wireType == null) {
+ this.wireType = type.getDataType(typeAttributes);
+ }
return new ColumnSchema(name, type,
key, nullable, defaultValue,
- blockSize, encoding, compressionAlgorithm,
typeAttributes);
+ desiredBlockSize, encoding, compressionAlgorithm,
+ typeAttributes, wireType);
}
}
}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
index 8e19f38..f5d746e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/Schema.java
@@ -22,7 +22,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import org.apache.kudu.Common.DataType;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -66,6 +68,9 @@ public class Schema {
private final int rowSize;
private final boolean hasNullableColumns;
+ private final int isDeletedIndex;
+ private static final int NO_IS_DELETED_INDEX = -1;
+
/**
* Constructs a schema using the specified columns and does some internal
accounting
*
@@ -102,6 +107,7 @@ public class Schema {
this.columnsById = hasColumnIds ? new HashMap<Integer,
Integer>(columnIds.size()) : null;
int offset = 0;
boolean hasNulls = false;
+ int isDeletedIndex = NO_IS_DELETED_INDEX;
// pre-compute a few counts and offsets
for (int index = 0; index < columns.size(); index++) {
final ColumnSchema column = columns.get(index);
@@ -126,11 +132,17 @@ public class Schema {
String.format("Column IDs must be unique: %s", columnIds));
}
}
+
+ // If this is the IS_DELETED virtual column, set `hasIsDeleted` and
`isDeletedIndex`.
+ if (column.getWireType() == DataType.IS_DELETED) {
+ isDeletedIndex = index;
+ }
}
- this.hasNullableColumns = hasNulls;
this.varLengthColumnCount = varLenCnt;
this.rowSize = getRowSize(this.columnsByIndex);
+ this.hasNullableColumns = hasNulls;
+ this.isDeletedIndex = isDeletedIndex;
}
/**
@@ -293,4 +305,16 @@ public class Schema {
public PartialRow newPartialRow() {
return new PartialRow(this);
}
+
+ /**
+ * @return the index of the IS_DELETED virtual column
+ * @throws IllegalStateException if no IS_DELETED virtual column exists
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public int getIsDeletedIndex() {
+ Preconditions.checkState(isDeletedIndex != NO_IS_DELETED_INDEX,
+ "Schema doesn't have an IS_DELETED columns");
+ return isDeletedIndex;
+ }
}
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/Type.java
b/java/kudu-client/src/main/java/org/apache/kudu/Type.java
index c3b2a91..ec7d542 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/Type.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/Type.java
@@ -29,7 +29,6 @@ import com.google.common.primitives.Shorts;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
-import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.util.DecimalUtil;
/**
@@ -147,6 +146,7 @@ public enum Type {
return 8 + 8; // offset then string length
case BOOL:
case INT8:
+ case IS_DELETED:
return 1;
case INT16:
return Shorts.BYTES;
@@ -183,6 +183,7 @@ public enum Type {
case DECIMAL64:
case DECIMAL128:
return DECIMAL;
+ case IS_DELETED: return BOOL;
default:
throw new IllegalArgumentException("The provided data type doesn't
map" +
" to know any known one: " +
type.getDescriptorForType().getFullName());
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
index 10fe8cb..5865876 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
@@ -47,6 +47,7 @@ public abstract class AbstractKuduScannerBuilder
long limit = Long.MAX_VALUE;
boolean prefetching = false;
boolean cacheBlocks = true;
+ long startTimestamp = AsyncKuduClient.NO_TIMESTAMP;
long htTimestamp = AsyncKuduClient.NO_TIMESTAMP;
byte[] lowerBoundPrimaryKey = AsyncKuduClient.EMPTY_ARRAY;
byte[] upperBoundPrimaryKey = AsyncKuduClient.EMPTY_ARRAY;
@@ -260,6 +261,25 @@ public abstract class AbstractKuduScannerBuilder
}
/**
+ * Sets the start timestamp and end timestamp for a diff scan.
+ * The timestamps should be encoded HT timestamps.
+ *
+ * Additionally sets any other scan properties required by diff scans.
+ *
+ * @param startTimestamp a long representing a HybridTime-encoded start
timestamp
+ * @param endTimestamp a long representing a HybridTime-encoded end timestamp
+ * @return this instance
+ */
+ @InterfaceAudience.Private
+ public S diffScan(long startTimestamp, long endTimestamp) {
+ this.startTimestamp = startTimestamp;
+ this.htTimestamp = endTimestamp;
+ this.isFaultTolerant = true;
+ this.readMode = AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT;
+ return (S) this;
+ }
+
+ /**
* Sets how long each scan request to a server can last.
* Defaults to {@link KuduClient#getDefaultOperationTimeoutMs()}.
* @param scanRequestTimeout a long representing time in milliseconds
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index aafe62b..7662bb5 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -44,6 +44,7 @@ import com.google.protobuf.UnsafeByteOperations;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import org.apache.kudu.security.Token;
+import org.apache.kudu.Type;
import org.apache.kudu.tserver.Tserver.ScannerKeepAliveRequestPB;
import org.apache.kudu.tserver.Tserver.ScannerKeepAliveResponsePB;
import org.apache.yetus.audience.InterfaceAudience;
@@ -149,6 +150,11 @@ public final class AsyncKuduScanner {
}
}
+ // This is private because it is not safe to use this column name as it may
be
+ // different in the case of collisions. Instead the `IS_DELETED` column
should
+ // be looked up by type.
+ static final String DEFAULT_IS_DELETED_COL_NAME = "is_deleted";
+
//////////////////////////
// Initial configurations.
//////////////////////////
@@ -199,6 +205,8 @@ public final class AsyncKuduScanner {
private final boolean isFaultTolerant;
+ private final long startTimestamp;
+
private long htTimestamp;
private long lowerBoundPropagationTimestamp = AsyncKuduClient.NO_TIMESTAMP;
@@ -253,7 +261,8 @@ public final class AsyncKuduScanner {
Map<String, KuduPredicate> predicates, long limit,
boolean cacheBlocks, boolean prefetching,
byte[] startPrimaryKey, byte[] endPrimaryKey,
- long htTimestamp, int batchSizeBytes, PartitionPruner
pruner,
+ long startTimestamp, long htTimestamp,
+ int batchSizeBytes, PartitionPruner pruner,
ReplicaSelection replicaSelection, long keepAlivePeriodMs) {
checkArgument(batchSizeBytes >= 0, "Need non-negative number of bytes, " +
"got %s", batchSizeBytes);
@@ -286,29 +295,32 @@ public final class AsyncKuduScanner {
this.prefetching = prefetching;
this.startPrimaryKey = startPrimaryKey;
this.endPrimaryKey = endPrimaryKey;
+ this.startTimestamp = startTimestamp;
this.htTimestamp = htTimestamp;
this.batchSizeBytes = batchSizeBytes;
this.lastPrimaryKey = AsyncKuduClient.EMPTY_ARRAY;
// Map the column names to actual columns in the table schema.
// If the user set this to 'null', we scan all columns.
+ List<ColumnSchema> columns = new ArrayList<ColumnSchema>();
if (projectedNames != null) {
- List<ColumnSchema> columns = new ArrayList<ColumnSchema>();
for (String columnName : projectedNames) {
ColumnSchema originalColumn = table.getSchema().getColumn(columnName);
columns.add(getStrippedColumnSchema(originalColumn));
}
- this.schema = new Schema(columns);
} else if (projectedIndexes != null) {
- List<ColumnSchema> columns = new ArrayList<ColumnSchema>();
for (Integer columnIndex : projectedIndexes) {
ColumnSchema originalColumn =
table.getSchema().getColumnByIndex(columnIndex);
columns.add(getStrippedColumnSchema(originalColumn));
}
- this.schema = new Schema(columns);
} else {
- this.schema = table.getSchema();
+ columns.addAll(table.getSchema().getColumns());
}
+ // This is a diff scan so add the IS_DELETED column.
+ if (startTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+ columns.add(generateIsDeletedColumn(table.getSchema()));
+ }
+ this.schema = new Schema(columns);
// If the partition pruner has pruned all partitions, then the scan can be
// short circuited without contacting any tablet servers.
@@ -330,14 +342,44 @@ public final class AsyncKuduScanner {
}
/**
- * Clone the given column schema instance. The new instance will include
only the name, type, and
- * nullability of the passed one.
+ * Generates and returns a ColumnSchema for the virtual IS_DELETED column.
+ * The column name is generated to ensure there is never a collision.
+ *
+ * @param schema the table schema
+ * @return a ColumnSchema for the virtual IS_DELETED column
+ */
+ private static ColumnSchema generateIsDeletedColumn(Schema schema) {
+ String columnName = DEFAULT_IS_DELETED_COL_NAME;
+ boolean collision = true;
+ while (collision) {
+ try {
+ // If getColumnIndex doesn't throw an IllegalArgumentException then
+ // the column already exists and we need to pick an alternate
IS_DELETED column name.
+ schema.getColumnIndex(columnName);
+ columnName += "_";
+ } catch (IllegalArgumentException ex) {
+ collision = false;
+ }
+ }
+ return new ColumnSchema.ColumnSchemaBuilder(columnName, Type.BOOL)
+ .wireType(Common.DataType.IS_DELETED)
+ .defaultValue(false)
+ .nullable(false)
+ .key(false)
+ .build();
+ }
+
+ /**
+ * Sets isKey to false on the passed ColumnSchema.
+ * This allows out of order key columns in projections.
+ *
+ * TODO: Remove the need for this by handling server side.
+ *
* @return a new column schema
*/
private static ColumnSchema getStrippedColumnSchema(ColumnSchema
columnToClone) {
- return new ColumnSchema.ColumnSchemaBuilder(columnToClone.getName(),
columnToClone.getType())
- .nullable(columnToClone.isNullable())
- .typeAttributes(columnToClone.getTypeAttributes())
+ return new ColumnSchema.ColumnSchemaBuilder(columnToClone)
+ .key(false)
.build();
}
@@ -407,6 +449,10 @@ public final class AsyncKuduScanner {
return keepAlivePeriodMs;
}
+ long getStartSnaphshotTimestamp() {
+ return this.startTimestamp;
+ }
+
long getSnapshotTimestamp() {
return this.htTimestamp;
}
@@ -983,10 +1029,14 @@ public final class AsyncKuduScanner {
}
newBuilder.setReadMode(AsyncKuduScanner.this.getReadMode().pbVersion());
- // if the mode is set to read on snapshot sent the snapshot timestamp
- if (AsyncKuduScanner.this.getReadMode() == ReadMode.READ_AT_SNAPSHOT
&&
- AsyncKuduScanner.this.getSnapshotTimestamp() !=
AsyncKuduClient.NO_TIMESTAMP) {
-
newBuilder.setSnapTimestamp(AsyncKuduScanner.this.getSnapshotTimestamp());
+ // if the mode is set to read on snapshot set the snapshot
timestamps.
+ if (AsyncKuduScanner.this.getReadMode() ==
ReadMode.READ_AT_SNAPSHOT) {
+ if (AsyncKuduScanner.this.getSnapshotTimestamp() !=
AsyncKuduClient.NO_TIMESTAMP) {
+
newBuilder.setSnapTimestamp(AsyncKuduScanner.this.getSnapshotTimestamp());
+ }
+ if (AsyncKuduScanner.this.getStartSnaphshotTimestamp() !=
AsyncKuduClient.NO_TIMESTAMP) {
+
newBuilder.setSnapStartTimestamp(AsyncKuduScanner.this.getStartSnaphshotTimestamp());
+ }
}
if (isFaultTolerant) {
@@ -1120,8 +1170,8 @@ public final class AsyncKuduScanner {
return new AsyncKuduScanner(
client, table, projectedColumnNames, projectedColumnIndexes,
readMode, isFaultTolerant,
scanRequestTimeout, predicates, limit, cacheBlocks, prefetching,
lowerBoundPrimaryKey,
- upperBoundPrimaryKey, htTimestamp, batchSizeBytes,
PartitionPruner.create(this),
- replicaSelection, keepAlivePeriodMs);
+ upperBoundPrimaryKey, startTimestamp, htTimestamp, batchSizeBytes,
+ PartitionPruner.create(this), replicaSelection, keepAlivePeriodMs);
}
}
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index 32618d3..db79e14 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -212,6 +212,10 @@ public class KuduScanToken implements
Comparable<KuduScanToken> {
if (message.hasSnapTimestamp()) {
builder.snapshotTimestampRaw(message.getSnapTimestamp());
}
+ // Set the diff scan timestamps if they are set.
+ if (message.hasSnapStartTimestamp()) {
+ builder.diffScan(message.getSnapStartTimestamp(),
message.getSnapTimestamp());
+ }
break;
}
case READ_LATEST: {
@@ -366,10 +370,14 @@ public class KuduScanToken implements
Comparable<KuduScanToken> {
proto.setPropagatedTimestamp(client.getLastPropagatedTimestamp());
}
- // If the mode is set to read on snapshot set the snapshot timestamp.
- if (readMode == AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT &&
- htTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
- proto.setSnapTimestamp(htTimestamp);
+ // If the mode is set to read on snapshot set the snapshot timestamps.
+ if (readMode == AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT) {
+ if (htTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+ proto.setSnapTimestamp(htTimestamp);
+ }
+ if (startTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+ proto.setSnapStartTimestamp(startTimestamp);
+ }
}
proto.setCacheBlocks(cacheBlocks);
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 3caeeb1..4e8daa7 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -206,8 +206,8 @@ public class KuduScanner implements Iterable<RowResult> {
return new KuduScanner(new AsyncKuduScanner(
client, table, projectedColumnNames, projectedColumnIndexes,
readMode, isFaultTolerant,
scanRequestTimeout, predicates, limit, cacheBlocks, prefetching,
lowerBoundPrimaryKey,
- upperBoundPrimaryKey, htTimestamp, batchSizeBytes,
PartitionPruner.create(this),
- replicaSelection, keepAlivePeriodMs));
+ upperBoundPrimaryKey, startTimestamp, htTimestamp, batchSizeBytes,
+ PartitionPruner.create(this), replicaSelection, keepAlivePeriodMs));
}
}
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index 726a2dc..200e8e2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -355,7 +355,9 @@ public abstract class Operation extends
KuduRpc<OperationResponse> {
columnCount = row.getSchema().getPrimaryKeyColumnCount();
// Clear the bits indicating any non-key fields are set.
columnsBitSet.clear(schema.getPrimaryKeyColumnCount(),
columnsBitSet.size() - 1);
- nullsBitSet.clear(schema.getPrimaryKeyColumnCount(),
nullsBitSet.size() - 1);
+ if (schema.hasNullableColumns()) {
+ nullsBitSet.clear(schema.getPrimaryKeyColumnCount(),
nullsBitSet.size() - 1);
+ }
}
rows.put(type.toEncodedByte());
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
index c7c64bd..261d42a 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/PartialRow.java
@@ -1515,7 +1515,8 @@ public class PartialRow {
int offset = lower.getSchema().getColumnOffset(index);
switch (type) {
- case BOOL: return lower.rowAlloc[offset] + 1 == upper.rowAlloc[offset];
+ case BOOL:
+ return lower.rowAlloc[offset] + 1 == upper.rowAlloc[offset];
case INT8: {
byte val = lower.rowAlloc[offset];
return val != Byte.MAX_VALUE && val + 1 == upper.rowAlloc[offset];
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
index 432ad28..bbb58ce 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
@@ -71,10 +71,11 @@ public class ProtobufHelper {
ColumnSchema column) {
schemaBuilder
.setName(column.getName())
- .setType(column.getType().getDataType(column.getTypeAttributes()))
+ .setType(column.getWireType())
.setIsKey(column.isKey())
.setIsNullable(column.isNullable())
.setCfileBlockSize(column.getDesiredBlockSize());
+
if (column.getEncoding() != null) {
schemaBuilder.setEncoding(column.getEncoding().getInternalPbType());
}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
index dd726aa..7f80670 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
@@ -20,6 +20,7 @@ package org.apache.kudu.client;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
+import java.util.Arrays;
import java.util.BitSet;
import org.apache.kudu.util.TimestampUtil;
@@ -248,10 +249,8 @@ public class RowResult {
public long getLong(int columnIndex) {
checkValidColumn(columnIndex);
checkNull(columnIndex);
- // Can't check type because this could be a long, string, or Timestamp.
- return Bytes.getLong(this.rowData.getRawArray(),
- this.rowData.getRawOffset() +
- getCurrentRowDataOffsetForColumn(columnIndex));
+ checkType(columnIndex, Type.INT64, Type.UNIXTIME_MICROS);
+ return getLongOrOffset(columnIndex);
}
/**
@@ -366,10 +365,7 @@ public class RowResult {
checkValidColumn(columnIndex);
checkNull(columnIndex);
checkType(columnIndex, Type.UNIXTIME_MICROS);
- ColumnSchema column = schema.getColumnByIndex(columnIndex);
- long micros = Bytes.getLong(this.rowData.getRawArray(),
- this.rowData.getRawOffset() +
- getCurrentRowDataOffsetForColumn(columnIndex));
+ long micros = getLongOrOffset(columnIndex);
return TimestampUtil.microsToTimestamp(micros);
}
@@ -405,8 +401,8 @@ public class RowResult {
checkValidColumn(columnIndex);
checkNull(columnIndex);
checkType(columnIndex, Type.STRING);
- // C++ puts a Slice in rowData which is 16 bytes long for simplity, but we
only support ints.
- long offset = getLong(columnIndex);
+ // C++ puts a Slice in rowData which is 16 bytes long for simplicity, but
we only support ints.
+ long offset = getLongOrOffset(columnIndex);
long length =
rowData.getLong(getCurrentRowDataOffsetForColumn(columnIndex) + 8);
assert offset < Integer.MAX_VALUE;
assert length < Integer.MAX_VALUE;
@@ -441,7 +437,7 @@ public class RowResult {
checkNull(columnIndex);
// C++ puts a Slice in rowData which is 16 bytes long for simplicity,
// but we only support ints.
- long offset = getLong(columnIndex);
+ long offset = getLongOrOffset(columnIndex);
long length =
rowData.getLong(getCurrentRowDataOffsetForColumn(columnIndex) + 8);
assert offset < Integer.MAX_VALUE;
assert length < Integer.MAX_VALUE;
@@ -483,7 +479,7 @@ public class RowResult {
checkType(columnIndex, Type.BINARY);
// C++ puts a Slice in rowData which is 16 bytes long for simplicity,
// but we only support ints.
- long offset = getLong(columnIndex);
+ long offset = getLongOrOffset(columnIndex);
long length =
rowData.getLong(getCurrentRowDataOffsetForColumn(columnIndex) + 8);
assert offset < Integer.MAX_VALUE;
assert length < Integer.MAX_VALUE;
@@ -492,6 +488,18 @@ public class RowResult {
}
/**
+ * Returns the long column value if the column type is INT64 or
UNIXTIME_MICROS.
+ * Returns the column's offset into the indirectData if the column type is
BINARY or STRING.
+ * @param columnIndex Column index in the schema
+ * @return a long value for the column
+ */
+ long getLongOrOffset(int columnIndex) {
+ return Bytes.getLong(this.rowData.getRawArray(),
+ this.rowData.getRawOffset() +
+ getCurrentRowDataOffsetForColumn(columnIndex));
+ }
+
+ /**
* Get if the specified column is NULL
* @param columnName name of the column in the schema
* @return true if the column cell is null and the column is nullable,
@@ -589,6 +597,16 @@ public class RowResult {
}
/**
+ * @return the value of the IS_DELETED virtual column
+ * @throws IllegalStateException if no IS_DELETED virtual column exists
+ */
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ public boolean isDeleted() {
+ return getBoolean(schema.getIsDeletedIndex());
+ }
+
+ /**
* Get the type of a column in this result.
* @param columnName name of the column
* @return a type
@@ -639,14 +657,17 @@ public class RowResult {
}
}
- private void checkType(int columnIndex, Type expectedType) {
+ private void checkType(int columnIndex, Type... types) {
ColumnSchema columnSchema = schema.getColumnByIndex(columnIndex);
Type columnType = columnSchema.getType();
- if (!columnType.equals(expectedType)) {
- throw new IllegalArgumentException("Column (name: " +
columnSchema.getName() +
- ", index: " + columnIndex + ") is of type " +
- columnType.getName() + " but was requested as a type " +
expectedType.getName());
+ for (Type type : types) {
+ if (columnType.equals(type)) {
+ return;
+ }
}
+ throw new IllegalArgumentException("Column (name: " +
columnSchema.getName() +
+ ", index: " + columnIndex + ") is of type " +
+ columnType.getName() + " but was requested as a type " +
Arrays.toString(types));
}
@Override
@@ -689,7 +710,7 @@ public class RowResult {
buf.append(getLong(i));
break;
case UNIXTIME_MICROS: {
- buf.append(TimestampUtil.timestampToString(getLong(i)));
+ buf.append(TimestampUtil.timestampToString(getTimestamp(i)));
} break;
case STRING:
buf.append(getString(i));
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
b/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
index 63cff02..045f8fb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/DataGenerator.java
@@ -62,10 +62,22 @@ public class DataGenerator {
* @param row the PartialRow to randomize.
*/
public void randomizeRow(PartialRow row) {
+ this.randomizeRow(row, true);
+ }
+
+ /**
+ * Randomizes the fields in a given PartialRow.
+ * @param row the PartialRow to randomize.
+ * @param randomizeKeys true if the key columns should be randomized.
+ */
+ public void randomizeRow(PartialRow row, boolean randomizeKeys) {
Schema schema = row.getSchema();
List<ColumnSchema> columns = schema.getColumns();
for (int i = 0; i < columns.size(); i++) {
ColumnSchema col = columns.get(i);
+ if (col.isKey() && !randomizeKeys) {
+ continue;
+ }
Type type = col.getType();
if (col.isNullable() && random.nextFloat() <= nullRate) {
// Sometimes set nullable columns to null.
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java
b/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java
index f6ebd65..959b8a5 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/util/TimestampUtil.java
@@ -80,6 +80,17 @@ public class TimestampUtil {
/**
* Transforms a timestamp into a string, whose formatting and timezone is
consistent
* across Kudu.
+ * @param timestamp the timestamp
+ * @return a string, in the format: YYYY-MM-DDTHH:MM:SS.ssssssZ
+ */
+ public static String timestampToString(Timestamp timestamp) {
+ long micros = timestampToMicros(timestamp);
+ return timestampToString(micros);
+ }
+
+ /**
+ * Transforms a timestamp into a string, whose formatting and timezone is
consistent
+ * across Kudu.
* @param micros the timestamp, in microseconds
* @return a string, in the format: YYYY-MM-DDTHH:MM:SS.ssssssZ
*/
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
index 57a44be..7ed7b55 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
@@ -17,34 +17,52 @@
package org.apache.kudu.client;
import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Common;
+import org.apache.kudu.Common.DataType;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
+import org.apache.kudu.client.Operation.ChangeType;
import org.apache.kudu.test.ClientTestUtil;
import org.apache.kudu.test.KuduTestHarness;
import org.apache.kudu.test.RandomUtils;
import org.apache.kudu.util.DataGenerator;
+import org.apache.kudu.util.Pair;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Random;
import java.util.Set;
+import static
org.apache.kudu.client.AsyncKuduScanner.DEFAULT_IS_DELETED_COL_NAME;
import static org.apache.kudu.test.ClientTestUtil.getBasicCreateTableOptions;
+import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestKuduScanner {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestScannerMultiTablet.class);
+
private static final String tableName = "TestKuduScanner";
- private static final Schema basicSchema = ClientTestUtil.getBasicSchema();
+ private static final int DIFF_FLUSH_SEC = 1;
private KuduClient client;
+ private Random random;
+ private DataGenerator generator;
@Rule
public KuduTestHarness harness = new KuduTestHarness();
@@ -52,11 +70,15 @@ public class TestKuduScanner {
@Before
public void setUp() {
client = harness.getClient();
+ random = RandomUtils.getRandom();
+ generator = new DataGenerator.DataGeneratorBuilder()
+ .random(random)
+ .build();
}
@Test(timeout = 100000)
public void testIterable() throws Exception {
- KuduTable table = client.createTable(tableName, basicSchema,
getBasicCreateTableOptions());
+ KuduTable table = client.createTable(tableName, getBasicSchema(),
getBasicCreateTableOptions());
DataGenerator generator = new DataGenerator.DataGeneratorBuilder()
.random(RandomUtils.getRandom())
.build();
@@ -153,4 +175,233 @@ public class TestKuduScanner {
}
}
}
+
+ @Test(timeout = 100000)
+ @KuduTestHarness.TabletServerConfig(flags = { "--flush_threshold_secs=" +
DIFF_FLUSH_SEC })
+ public void testDiffScan() throws Exception {
+ Schema schema = new Schema(Arrays.asList(
+ new ColumnSchema.ColumnSchemaBuilder("key",
Type.INT32).key(true).build(),
+ // Include a column with the default IS_DELETED column name to test
collision handling.
+ new ColumnSchema.ColumnSchemaBuilder(DEFAULT_IS_DELETED_COL_NAME,
Type.INT32).build()
+ ));
+
+ KuduTable table = client.createTable(tableName, schema,
getBasicCreateTableOptions());
+
+ // Generate some rows before the start time.
+ int beforeBounds = 5;
+ List<Operation> beforeOps = generateMutationOperations(table,
random.nextInt(beforeBounds),
+ random.nextInt(beforeBounds), random.nextInt(beforeBounds));
+ Map<Integer, ChangeType> before = applyOperations(beforeOps);
+ LOG.info("Before: {}", before);
+
+ // Set the start timestamp after the initial mutations by getting the
propagated timestamp,
+ // and incrementing by 1.
+ long startHT = client.getLastPropagatedTimestamp() + 1;
+ LOG.info("startHT: {}", startHT);
+
+ // Generate row mutations.
+ int mutationBounds = 10;
+ int numInserts = random.nextInt(mutationBounds);
+ int numUpdates = random.nextInt(mutationBounds);
+ int numDeletes = random.nextInt(mutationBounds);
+ List<Operation> operations =
+ generateMutationOperations(table, numInserts, numUpdates, numDeletes);
+ Map<Integer, ChangeType> mutations = applyOperations(operations);
+ LOG.info("Mutations: {}", mutations);
+
+ // Set the end timestamp after the test mutations by getting the
propagated timestamp,
+ // and incrementing by 1.
+ long endHT = client.getLastPropagatedTimestamp() + 1;
+ LOG.info("endHT: {}", endHT);
+
+ // Generate Some Rows after the end time.
+ int afterBounds = 5;
+ List<Operation> afterOps = generateMutationOperations(table,
random.nextInt(afterBounds),
+ random.nextInt(afterBounds), random.nextInt(afterBounds));
+ Map<Integer, ChangeType> after = applyOperations(afterOps);
+ LOG.info("After: {}", after);
+
+ // Diff scan the time range.
+ // Pass through the scan token API to ensure serialization of tokens works
too.
+ List<KuduScanToken> tokens = client.newScanTokenBuilder(table)
+ .diffScan(startHT, endHT)
+ .build();
+ List<RowResult> results = new ArrayList<>();
+ for (KuduScanToken token : tokens) {
+ KuduScanner scanner =
KuduScanToken.deserializeIntoScanner(token.serialize(), client);
+
+ // Verify the IS_DELETED column is appended at the end of the projection.
+ Schema projection = scanner.getProjectionSchema();
+ int isDeletedIndex = projection.getIsDeletedIndex();
+ assertEquals(projection.getColumnCount() - 1, isDeletedIndex);
+ // Verify the IS_DELETED column has the correct types.
+ ColumnSchema isDeletedCol = projection.getColumnByIndex(isDeletedIndex);
+ assertEquals(Type.BOOL, isDeletedCol.getType());
+ assertEquals(DataType.IS_DELETED, isDeletedCol.getWireType());
+ // Verify the IS_DELETED column is named to avoid collision.
+ assertEquals(projection.getColumnByIndex(isDeletedIndex),
+ projection.getColumn(DEFAULT_IS_DELETED_COL_NAME + "_"));
+
+ for (RowResult row : scanner) {
+ results.add(row);
+ }
+ }
+ assertEquals(mutations.size(), results.size());
+
+ // Count the results and verify their change type.
+ int resultNumInserts = 0;
+ int resultNumUpdates = 0;
+ int resultNumDeletes = 0;
+ int resultExtra = 0;
+ for (RowResult result : results) {
+ Integer key = result.getInt(0);
+ LOG.info("Processing key {}", key);
+ ChangeType type = mutations.get(key);
+ if (type == ChangeType.INSERT) {
+ assertFalse(result.isDeleted());
+ resultNumInserts++;
+ } else if (type == ChangeType.UPDATE) {
+ assertFalse(result.isDeleted());
+ resultNumUpdates++;
+ } else if (type == ChangeType.DELETE) {
+ assertTrue(result.isDeleted());
+ resultNumDeletes++;
+ } else {
+ // The key was not found in the mutations map. This means that we
somehow managed to scan
+ // a row that was never mutated. It's an error and will trigger an
assert below.
+ assertNull(type);
+ resultExtra++;
+ }
+ }
+ assertEquals(numInserts, resultNumInserts);
+ assertEquals(numUpdates, resultNumUpdates);
+ assertEquals(numDeletes, resultNumDeletes);
+ assertEquals(0, resultExtra);
+ }
+
+ /**
+ * Applies a list of Operations and returns the final ChangeType for each
key.
+ * @param operations the operations to apply.
+ * @return a map of each key and its final ChangeType.
+ */
+ private Map<Integer, ChangeType> applyOperations(List<Operation> operations)
throws Exception {
+ Map<Integer, ChangeType> results = new HashMap<>();
+ KuduSession session = client.newSession();
+ // On some runs, wait long enough to flush at the start.
+ if (random.nextBoolean()) {
+ LOG.info("Waiting for a flush at the start of applyOperations");
+ Thread.sleep(DIFF_FLUSH_SEC + 1);
+ }
+
+ // Pick an int as a flush indicator so we flush once on average while
applying operations.
+ int flushInt = random.nextInt(operations.size());
+ for (Operation op : operations) {
+ // On some runs, wait long enough to flush while applying operations.
+ if (random.nextInt(operations.size()) == flushInt) {
+ LOG.info("Waiting for a flush in the middle of applyOperations");
+ Thread.sleep(DIFF_FLUSH_SEC + 1);
+ }
+ OperationResponse resp = session.apply(op);
+ if (resp.hasRowError()) {
+ LOG.error("Could not mutate row: " +
resp.getRowError().getErrorStatus());
+ }
+ assertFalse(resp.hasRowError());
+ results.put(op.getRow().getInt(0), op.getChangeType());
+ }
+ return results;
+ }
+
+ /**
+ * Generates a list of random mutation operations. Any unique row,
identified by
+ * it's key, could have a random number of operations/mutations. However, the
+ * target count of numInserts, numUpdates and numDeletes will always be
achieved
+ * if the entire list of operations is processed.
+ *
+ * @param table the table to generate operations for
+ * @param numInserts The number of row mutations to end with an insert
+ * @param numUpdates The number of row mutations to end with an update
+ * @param numDeletes The number of row mutations to end with an delete
+ * @return a list of random mutation operations
+ */
+ private List<Operation> generateMutationOperations(
+ KuduTable table, int numInserts, int numUpdates, int numDeletes) throws
Exception {
+
+ List<Operation> results = new ArrayList<>();
+ List<MutationState> unfinished = new ArrayList<>();
+ int minMutationsBound = 5;
+
+ // Generate Operations to initialize all of the row with inserts.
+ List<Pair<ChangeType, Integer>> changeCounts = Arrays.asList(
+ new Pair<>(ChangeType.INSERT, numInserts),
+ new Pair<>(ChangeType.UPDATE, numUpdates),
+ new Pair<>(ChangeType.DELETE, numDeletes));
+ Set<Integer> keys = new HashSet<>();
+ for (Pair<ChangeType, Integer> changeCount : changeCounts) {
+ ChangeType type = changeCount.getFirst();
+ int count = changeCount.getSecond();
+ for (int i = 0; i < count; i++) {
+ // Generate a random insert.
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ generator.randomizeRow(row);
+ int key = row.getInt(0);
+ // Add the insert to the results.
+ results.add(insert);
+ // Initialize the unfinished MutationState.
+ unfinished.add(new MutationState(key, type,
random.nextInt(minMutationsBound)));
+ }
+ }
+
+ // Randomly pull from the unfinished list, mutate it and add that
operation to the results.
+ // If it has been mutated at least the minimum number of times, remove it
from the unfinished
+ // list.
+ while (!unfinished.isEmpty()) {
+ // Get a random row to mutate.
+ int index = random.nextInt(unfinished.size());
+ MutationState state = unfinished.get(index);
+
+ // If the row is done, remove it from unfinished and continue.
+ if (state.numMutations >= state.minMutations && state.currentType ==
state.endType) {
+ unfinished.remove(index);
+ continue;
+ }
+
+ // Otherwise, generate an operation to mutate the row based on it's
current ChangeType.
+ // insert -> update|delete
+ // update -> update|delete
+ // delete -> insert
+ Operation op;
+ if (state.currentType == ChangeType.INSERT || state.currentType ==
ChangeType.UPDATE) {
+ op = (random.nextBoolean()) ? table.newUpdate() : table.newDelete();
+ } else {
+ // Must be a delete, so we need an insert next.
+ op = table.newInsert();
+ }
+ PartialRow row = table.getSchema().newPartialRow();
+ row.addInt(0, state.key);
+ generator.randomizeRow(row, /* randomizeKeys */ false);
+ op.setRow(row);
+ results.add(op);
+
+ state.currentType = op.getChangeType();
+ state.numMutations++;
+ }
+
+ return results;
+ }
+
+ private static class MutationState {
+ final int key;
+ final ChangeType endType;
+ final int minMutations;
+
+ ChangeType currentType = ChangeType.INSERT;
+ int numMutations = 0;
+
+ MutationState(int key, ChangeType endType, int minMutations) {
+ this.key = key;
+ this.endType = endType;
+ this.minMutations = minMutations;
+ }
+ }
}
diff --git a/src/kudu/client/client.proto b/src/kudu/client/client.proto
index edb3f00..988a471 100644
--- a/src/kudu/client/client.proto
+++ b/src/kudu/client/client.proto
@@ -77,10 +77,16 @@ message ScanTokenPB {
// See common.proto for further information about read modes.
optional ReadMode read_mode = 10 [default = READ_LATEST];
- // The requested snapshot timestamp. This is only used O
+ // The requested snapshot timestamp. This is only used
// when the read mode is set to READ_AT_SNAPSHOT.
optional fixed64 snap_timestamp = 11;
+ // The requested snapshot start timestamp. This is only used
+ // when the read mode is set to READ_AT_SNAPSHOT.
+ // Setting this indicates the scan should be a diff scan, the
+ // snap_timestamp will be used as the snapshot end timestamp.
+ optional fixed64 snap_start_timestamp = 19;
+
// Sent by clients which previously executed CLIENT_PROPAGATED writes.
// This updates the server's time so that no transaction will be assigned
// a timestamp lower than or equal to 'previous_known_timestamp'