This is an automated email from the ASF dual-hosted git repository.
granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 34e8562 [java] Support columnar row format in Java client.
34e8562 is described below
commit 34e85622f37f88b0436d9ba51df7592bf0e159de
Author: Shuping Zhou <[email protected]>
AuthorDate: Sun May 24 00:16:42 2020 +0800
[java] Support columnar row format in Java client.
This change only adds setRowDataFormat() method to KuduScanner and
AsyncKuduScanner,
and keeps everything else unchanged to application interfaces.
Benchmark (numRows) (reuseResultRow) Mode Cnt
Score Error Units
testRowResultOriginal 1 true thrpt 5
10746644.238 ± 5279029.400 ops/s
testRowwiseResult 1 true thrpt 5
10341141.084 ± 1027963.366 ops/s
testColumnarResult 1 true thrpt 5
15572578.025 ± 834029.227 ops/s
testRowResultOriginal 1 false thrpt 5
10657631.661 ± 1269080.856 ops/s
testRowwiseResult 1 false thrpt 5
10922244.600 ± 392930.469 ops/s
testColumnarResult 1 false thrpt 5
15861887.600 ± 122277.115 ops/s
testRowResultOriginal 10 true thrpt 5
1576792.893 ± 5695.536 ops/s
testRowwiseResult 10 true thrpt 5
1498735.241 ± 121545.258 ops/s
testColumnarResult 10 true thrpt 5
1642865.093 ± 79125.314 ops/s
testRowResultOriginal 10 false thrpt 5
1166811.843 ± 43376.177 ops/s
testRowwiseResult 10 false thrpt 5
1055728.419 ± 46228.437 ops/s
testColumnarResult 10 false thrpt 5
1552995.973 ± 100622.958 ops/s
testRowResultOriginal 10000 true thrpt 5
1363.470 ± 70.807 ops/s
testRowwiseResult 10000 true thrpt 5
1318.791 ± 111.350 ops/s
testColumnarResult 10000 true thrpt 5
1288.663 ± 157.520 ops/s
testRowResultOriginal 10000 false thrpt 5
1028.611 ± 97.045 ops/s
testRowwiseResult 10000 false thrpt 5
1030.924 ± 35.353 ops/s
testColumnarResult 10000 false thrpt 5
1223.328 ± 230.125 ops/s
Change-Id: I580c9bb48e797ca8de9d88e7892df6860eed7070
Reviewed-on: http://gerrit.cloudera.org:8080/15983
Tested-by: Kudu Jenkins
Reviewed-by: Grant Henke <[email protected]>
---
java/kudu-client/build.gradle | 12 +
.../kudu/client/RowResultIteratorBenchmark.java | 184 ++++++++++
.../org/apache/kudu/client/AsyncKuduScanner.java | 55 ++-
.../org/apache/kudu/client/ColumnarRowResult.java | 311 +++++++++++++++++
.../kudu/client/ColumnarRowResultIterator.java | 130 +++++++
.../java/org/apache/kudu/client/KuduScanner.java | 9 +
.../java/org/apache/kudu/client/RowResult.java | 265 +++------------
.../org/apache/kudu/client/RowResultIterator.java | 87 +----
.../org/apache/kudu/client/RowwiseRowResult.java | 372 +++++++++++++++++++++
...Iterator.java => RowwiseRowResultIterator.java} | 89 ++---
.../java/org/apache/kudu/client/TestRowResult.java | 266 ++++++++-------
11 files changed, 1310 insertions(+), 470 deletions(-)
diff --git a/java/kudu-client/build.gradle b/java/kudu-client/build.gradle
index 6ac2f07..15ea439 100644
--- a/java/kudu-client/build.gradle
+++ b/java/kudu-client/build.gradle
@@ -15,6 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+plugins {
+ id "me.champeau.gradle.jmh" version "0.5.0"
+}
+
apply from: "$rootDir/gradle/protobuf.gradle"
apply from: "$rootDir/gradle/shadow.gradle"
@@ -45,6 +49,9 @@ dependencies {
testCompile libs.log4j
testCompile libs.log4jSlf4jImpl
testCompile libs.mockitoCore
+
+ jmh 'org.openjdk.jmh:jmh-core:1.23'
+ jmh 'org.openjdk.jmh:jmh-generator-annprocess:1.23'
}
// Add protobuf files to the proto source set.
@@ -57,3 +64,8 @@ sourceSets {
}
}
}
+
+jmh {
+ duplicateClassesStrategy = 'exclude'
+ zip64 = true
+}
\ No newline at end of file
diff --git
a/java/kudu-client/src/jmh/java/org/apache/kudu/client/RowResultIteratorBenchmark.java
b/java/kudu-client/src/jmh/java/org/apache/kudu/client/RowResultIteratorBenchmark.java
new file mode 100644
index 0000000..4ede40c
--- /dev/null
+++
b/java/kudu-client/src/jmh/java/org/apache/kudu/client/RowResultIteratorBenchmark.java
@@ -0,0 +1,184 @@
+package org.apache.kudu.client;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.netty.util.CharsetUtil;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.infra.Blackhole;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.util.Slice;
+import org.apache.kudu.util.Slices;
+
+@State(Scope.Thread)
+@Fork(1)
+public class RowResultIteratorBenchmark {
+
+ final Schema schema;
+
+ // for rowwise result
+ Slice rowwiseBs;
+ Slice rowwiseIndirectBs;
+
+ // for columnar result
+ Slice[] columnarData;
+ Slice[] columnarVarlenData;
+ Slice[] columnarNonNullBitmaps;
+
+ @Param({"true", "false"})
+ boolean reuseResultRow;
+
+ @Param({"1", "10", "10000"})
+ int numRows;
+
+ public RowResultIteratorBenchmark() {
+ List<ColumnSchema> columns = new ArrayList<>();
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("action",
Type.STRING).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("time",
Type.INT32).key(true).build());
+ columns.add(new ColumnSchema.ColumnSchemaBuilder("seq",
Type.INT64).key(true).build());
+ this.schema = new Schema(columns);
+ }
+
+ @Setup
+ public void prepare() {
+ prepareRowwiseSlices();
+ prepareColumnarSlices();
+ }
+
+ private void prepareRowwiseSlices() {
+ byte[] data = new byte[numRows * schema.getRowSize()];
+ byte[] vardata = new byte[numRows * 10];
+
+ int offset = 0;
+ int vardataOffset = 0;
+
+ for (int i = 0; i < numRows; i++) {
+ String action = "action" + i;
+ int actionLen = action.getBytes(CharsetUtil.UTF_8).length;
+
+ offset += writeLong(data, offset, vardataOffset);
+ offset += writeLong(data, offset, actionLen);
+ offset += writeInt(data, offset, i);
+ offset += writeLong(data, offset, i * 10000L);
+
+ vardataOffset += writeString(vardata, vardataOffset, action);
+ }
+
+ rowwiseBs = Slices.wrappedBuffer(data);
+ rowwiseIndirectBs = Slices.wrappedBuffer(vardata);
+ }
+
+ private void prepareColumnarSlices() {
+ byte[][] data = new byte[3][];
+ data[0] = new byte[4 * (numRows + 1)];
+ data[1] = new byte[4 * numRows];
+ data[2] = new byte[8 * numRows];
+
+ byte[][] varData = new byte[3][];
+ varData[0] = new byte[numRows * 10];
+ varData[1] = new byte[0];
+ varData[2] = new byte[0];
+
+ byte[][] nonNullBitmaps = new byte[3][];
+ nonNullBitmaps[0] = new byte[0];
+ nonNullBitmaps[1] = new byte[0];
+ nonNullBitmaps[2] = new byte[0];
+
+ int dataOffset0 = 0;
+ int dataOffset1 = 0;
+ int dataOffset2 = 0;
+
+ int varDataOffset0 = 0;
+
+ for (int i = 0; i < numRows; i++) {
+ String action = "action" + i;
+
+ dataOffset0 += writeInt(data[0], dataOffset0, varDataOffset0);
+ varDataOffset0 += writeString(varData[0], varDataOffset0, action);
+
+ dataOffset1 += writeInt(data[1], dataOffset1, i);
+ dataOffset2 += writeLong(data[2], dataOffset2, i * 10000L);
+ }
+ // write offset for last row.
+ writeInt(data[0], dataOffset0, varDataOffset0);
+
+ columnarData = new Slice[3];
+ columnarVarlenData = new Slice[3];
+ columnarNonNullBitmaps = new Slice[3];
+ for (int i = 0; i < 3; i++) {
+ columnarData[i] = Slices.wrappedBuffer(data[i]);
+ columnarVarlenData[i] = Slices.wrappedBuffer(varData[i]);
+ columnarNonNullBitmaps[i] = Slices.wrappedBuffer(nonNullBitmaps[i]);
+ }
+ }
+
+ @Benchmark
+ public void testRowwiseResult(Blackhole blackhole) {
+ RowResultIterator iter = new RowwiseRowResultIterator(
+ 0, "uuid", schema, numRows,
+ rowwiseBs, rowwiseIndirectBs, reuseResultRow);
+
+ while (iter.hasNext()) {
+ RowResult row = iter.next();
+ String action = row.getString(0);
+ int time = row.getInt(1);
+ long seq = row.getLong(2);
+
+ blackhole.consume(action);
+ blackhole.consume(time);
+ blackhole.consume(seq);
+ }
+ }
+
+ @Benchmark
+ public void testColumnarResult(Blackhole blackhole) {
+ RowResultIterator iter = new ColumnarRowResultIterator(
+ 0, "uuid", schema, numRows,
+ columnarData, columnarVarlenData, columnarNonNullBitmaps,
reuseResultRow);
+
+ while (iter.hasNext()) {
+ RowResult row = iter.next();
+ String action = row.getString(0);
+ int time = row.getInt(1);
+ long seq = row.getLong(2);
+
+ blackhole.consume(action);
+ blackhole.consume(time);
+ blackhole.consume(seq);
+ }
+ }
+
+ private static int writeInt(final byte[] b, final int offset, final int
value) {
+ b[offset + 0] = (byte) (value >> 0);
+ b[offset + 1] = (byte) (value >> 8);
+ b[offset + 2] = (byte) (value >> 16);
+ b[offset + 3] = (byte) (value >> 24);
+ return 4;
+ }
+
+ private static int writeLong(final byte[] b, final int offset, final long
value) {
+ b[offset + 0] = (byte) (value >> 0);
+ b[offset + 1] = (byte) (value >> 8);
+ b[offset + 2] = (byte) (value >> 16);
+ b[offset + 3] = (byte) (value >> 24);
+ b[offset + 4] = (byte) (value >> 32);
+ b[offset + 5] = (byte) (value >> 40);
+ b[offset + 6] = (byte) (value >> 48);
+ b[offset + 7] = (byte) (value >> 56);
+ return 8;
+ }
+
+ private static int writeString(final byte[] b, final int offset, final
String value) {
+ byte[] data = value.getBytes(CharsetUtil.UTF_8);
+ System.arraycopy(data, 0, b, offset, data.length);
+ return data.length;
+ }
+}
\ No newline at end of file
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 7fca149..3d1f0fa 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -151,6 +151,31 @@ public final class AsyncKuduScanner {
}
}
+ /**
+ * Expected row data format in scanner result set.
+ *
+ * The server may or may not support the expected layout, and the actual
layout is internal
+ * hidden by {@link RowResult} and {@link RowResultIterator} interfaces so
it's transparent to
+ * application code.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public enum RowDataFormat {
+ /**
+ * Server is expected to return scanner result data in row-wise layout.
+ * This is currently the default layout.
+ */
+ ROWWISE,
+
+ /**
+ * Server is expected to return scanner result data in columnar layout.
+ * This layout is more efficient in processing and bandwidth for both
server and client side.
+ * It requires server support (kudu-1.12.0 and later), if it's not
supported server still
+ * returns data in row-wise layout.
+ */
+ COLUMNAR,
+ }
+
// This is private because it is not safe to use this column name as it may
be
// different in the case of collisions. Instead the `IS_DELETED` column
should
// be looked up by type.
@@ -230,6 +255,8 @@ public final class AsyncKuduScanner {
private long numRowsReturned = 0;
+ private RowDataFormat rowDataFormat = RowDataFormat.ROWWISE;
+
/**
* The tabletSlice currently being scanned.
* If null, we haven't started scanning.
@@ -484,6 +511,15 @@ public final class AsyncKuduScanner {
}
/**
+ * Optionally set expected row data format.
+ *
+ * @param rowDataFormat Row data format to be expected.
+ */
+ public void setRowDataFormat(RowDataFormat rowDataFormat) {
+ this.rowDataFormat = rowDataFormat;
+ }
+
+ /**
* Scans a number of rows.
* <p>
* Once this method returns {@code null} once (which indicates that this
@@ -1043,6 +1079,12 @@ public final class AsyncKuduScanner {
newBuilder.setTabletId(UnsafeByteOperations.unsafeWrap(tablet.getTabletIdAsBytes()));
newBuilder.setOrderMode(AsyncKuduScanner.this.getOrderMode());
newBuilder.setCacheBlocks(cacheBlocks);
+
+ long rowFormatFlags = Tserver.RowFormatFlags.NO_FLAGS_VALUE;
+ if (rowDataFormat == RowDataFormat.COLUMNAR) {
+ rowFormatFlags |=
Tserver.RowFormatFlags.COLUMNAR_LAYOUT.getNumber();
+ }
+ newBuilder.setRowFormatFlags(rowFormatFlags);
// If the last propagated timestamp is set, send it with the scan.
// For READ_YOUR_WRITES scan, use the propagated timestamp from
// the scanner.
@@ -1140,9 +1182,16 @@ public final class AsyncKuduScanner {
}
}
// TODO: Find a clean way to plumb in reuseRowResult.
- RowResultIterator iterator = RowResultIterator.makeRowResultIterator(
- timeoutTracker.getElapsedMillis(), tsUUID, schema, resp.getData(),
callResponse,
- reuseRowResult);
+ RowResultIterator iterator;
+ if (resp.hasData()) {
+ iterator = RowwiseRowResultIterator.makeRowResultIterator(
+ timeoutTracker.getElapsedMillis(), tsUUID, schema,
resp.getData(),
+ callResponse, reuseRowResult);
+ } else {
+ iterator = ColumnarRowResultIterator.makeRowResultIterator(
+ timeoutTracker.getElapsedMillis(), tsUUID, schema,
resp.getColumnarData(),
+ callResponse, reuseRowResult);
+ }
boolean hasMore = resp.getHasMoreResults();
if (id.length != 0 && scannerId != null && !Bytes.equals(scannerId, id))
{
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnarRowResult.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnarRowResult.java
new file mode 100644
index 0000000..721c6b2
--- /dev/null
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnarRowResult.java
@@ -0,0 +1,311 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.util.Slice;
+import org.apache.kudu.util.TimestampUtil;
+
+/**
+ * RowResult represents one row from a scanner, in columnar layout.
+ */
[email protected]
[email protected]
+class ColumnarRowResult extends RowResult {
+
+ private final Slice[] data;
+ private final Slice[] varlenData;
+ private final Slice[] nonNullBitmaps;
+
+ /**
+ * Prepares the row representation using the provided data. Doesn't copy data
+ * out of the byte arrays. Package private.
+ * @param schema Schema used to build the rowData
+ * @param data The raw columnar data corresponding to the primitive-typed
columns
+ * @param varlenData The variable-length data for the variable-length-typed
columns
+ * @param nonNullBitmaps The bitmaps corresponding to the non-null status of
the cells
+ * @param rowIndex The index of the row in data/varlenData/nonNullBitmaps
+ */
+ ColumnarRowResult(Schema schema, Slice[] data, Slice[] varlenData, Slice[]
nonNullBitmaps,
+ int rowIndex) {
+ super(schema, rowIndex);
+ this.data = data;
+ this.varlenData = varlenData;
+ this.nonNullBitmaps = nonNullBitmaps;
+ advancePointerTo(rowIndex);
+ }
+
+ /**
+ * Get the specified column's integer
+ * @param columnIndex Column index in the schema
+ * @return an integer
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public int getInt(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.INT32, Type.DATE);
+ return Bytes.getInt(this.data[columnIndex].getRawArray(),
+ this.data[columnIndex].getRawOffset() + index * 4);
+ }
+
+ /**
+ * Get the specified column's short
+ * @param columnIndex Column index in the schema
+ * @return a short
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public short getShort(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.INT16);
+ return Bytes.getShort(this.data[columnIndex].getRawArray(),
+ this.data[columnIndex].getRawOffset() + index * 2);
+ }
+
+ /**
+ * Get the specified column's boolean
+ * @param columnIndex Column index in the schema
+ * @return a boolean
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public boolean getBoolean(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.BOOL);
+ byte b = Bytes.getByte(this.data[columnIndex].getRawArray(),
+ this.data[columnIndex].getRawOffset() + index);
+ return b == 1;
+ }
+
+ /**
+ * Get the specified column's byte
+ * @param columnIndex Column index in the schema
+ * @return a byte
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public byte getByte(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.INT8);
+ return Bytes.getByte(this.data[columnIndex].getRawArray(),
+ this.data[columnIndex].getRawOffset() + index);
+ }
+
+ /**
+ * Get the specified column's float
+ * @param columnIndex Column index in the schema
+ * @return a float
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public float getFloat(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.FLOAT);
+ return Bytes.getFloat(this.data[columnIndex].getRawArray(),
+ this.data[columnIndex].getRawOffset() + index * 4);
+ }
+
+ /**
+ * Get the specified column's double
+ * @param columnIndex Column index in the schema
+ * @return a double
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public double getDouble(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.DOUBLE);
+ return Bytes.getDouble(this.data[columnIndex].getRawArray(),
+ this.data[columnIndex].getRawOffset() + index * 8);
+ }
+
+ /**
+ * Get the specified column's Decimal.
+ *
+ * @param columnIndex Column index in the schema
+ * @return a BigDecimal.
+ * @throws IllegalArgumentException if the column is null
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public BigDecimal getDecimal(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.DECIMAL);
+ ColumnSchema column = schema.getColumnByIndex(columnIndex);
+ ColumnTypeAttributes typeAttributes = column.getTypeAttributes();
+ return Bytes.getDecimal(this.data[columnIndex].getRawArray(),
+ this.data[columnIndex].getRawOffset() +
Type.DECIMAL.getSize(typeAttributes) * index,
+ typeAttributes.getPrecision(), typeAttributes.getScale());
+ }
+
+ /**
+ * Get the specified column's Timestamp.
+ *
+ * @param columnIndex Column index in the schema
+ * @return a Timestamp
+ * @throws IllegalArgumentException if the column is null, is unset,
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public Timestamp getTimestamp(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.UNIXTIME_MICROS);
+ long micros = getLong(columnIndex);
+ return TimestampUtil.microsToTimestamp(micros);
+ }
+
+ @Override
+ public String getVarLengthData(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.STRING, Type.VARCHAR);
+ // C++ puts a Slice in rowData which is 16 bytes long for simplicity, but
we only support ints.
+ int offset = getOffsetForCurrentRow(columnIndex);
+ int length = getOffsetForNextRow(columnIndex) - offset;
+ assert offset < Integer.MAX_VALUE;
+ assert length < Integer.MAX_VALUE;
+ return Bytes.getString(varlenData[columnIndex].getRawArray(),
+ varlenData[columnIndex].getRawOffset() + offset,
+ length);
+ }
+
+ /**
+ * Get a copy of the specified column's binary data.
+ * @param columnIndex Column index in the schema
+ * @return a byte[] with the binary data.
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public byte[] getBinaryCopy(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ // C++ puts a Slice in rowData which is 16 bytes long for simplicity,
+ // but we only support ints.
+ int offset = getOffsetForCurrentRow(columnIndex);
+ int length = getOffsetForNextRow(columnIndex) - offset;
+ assert offset < Integer.MAX_VALUE;
+ assert length < Integer.MAX_VALUE;
+ byte[] ret = new byte[length];
+ System.arraycopy(varlenData[columnIndex].getRawArray(),
+ varlenData[columnIndex].getRawOffset() + offset,
+ ret, 0, length);
+ return ret;
+ }
+
+ /**
+ * Get the specified column's binary data.
+ *
+ * This doesn't copy the data and instead returns a ByteBuffer that wraps it.
+ *
+ * @param columnIndex Column index in the schema
+ * @return a ByteBuffer with the binary data.
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public ByteBuffer getBinary(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.BINARY);
+ // C++ puts a Slice in rowData which is 16 bytes long for simplicity,
+ // but we only support ints.
+ int offset = getOffsetForCurrentRow(columnIndex);
+ int length = getOffsetForNextRow(columnIndex) - offset;
+ assert offset < Integer.MAX_VALUE;
+ assert length < Integer.MAX_VALUE;
+ return ByteBuffer.wrap(varlenData[columnIndex].getRawArray(),
+ varlenData[columnIndex].getRawOffset() + offset, length);
+ }
+
+ @Override
+ public long getLong(int columnIndex) {
+ return Bytes.getLong(this.data[columnIndex].getRawArray(),
+ this.data[columnIndex].getRawOffset() + index * 8);
+ }
+
+ protected int getOffsetForCurrentRow(int columnIndex) {
+ return Bytes.getInt(this.data[columnIndex].getRawArray(),
+ this.data[columnIndex].getRawOffset() + index * 4);
+ }
+
+ protected int getOffsetForNextRow(int columnIndex) {
+ return Bytes.getInt(this.data[columnIndex].getRawArray(),
+ this.data[columnIndex].getRawOffset() + (index + 1) * 4);
+ }
+
+ /**
+ * Get if the specified column is NULL
+ * @param columnIndex Column index in the schema
+ * @return true if the column cell is null and the column is nullable,
+ * false otherwise
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public boolean isNull(int columnIndex) {
+ checkValidColumn(columnIndex);
+ if (!schema.getColumnByIndex(columnIndex).isNullable()) {
+ return false;
+ }
+
+ byte flag = Bytes.getByte(this.nonNullBitmaps[columnIndex].getRawArray(),
+ this.nonNullBitmaps[columnIndex].getRawOffset() + index / 8);
+
+ boolean nonNull = (flag & (1 << (index % 8))) != 0;
+ return !nonNull;
+ }
+
+ @Override
+ public String toString() {
+ return "ColumnarRowResult index: " + this.index;
+ }
+}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnarRowResultIterator.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnarRowResultIterator.java
new file mode 100644
index 0000000..07e5d9e
--- /dev/null
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/ColumnarRowResultIterator.java
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.util.NoSuchElementException;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.Schema;
+import org.apache.kudu.WireProtocol;
+import org.apache.kudu.util.Slice;
+
+/**
+ * Class that contains the rows in columnar layout sent by a tablet server,
+ * exhausting this iterator only means that all the rows from the last server
response were read.
+ */
[email protected]
[email protected]
+@SuppressWarnings("IterableAndIterator")
+class ColumnarRowResultIterator extends RowResultIterator {
+
+ private static final ColumnarRowResultIterator EMPTY =
+ new ColumnarRowResultIterator(0, null, null, 0,
+ null, null, null, false);
+
+ private final Slice[] data;
+ private final Slice[] varlenData;
+ private final Slice[] nonNullBitmaps;
+ private final RowResult sharedRowResult;
+
+ /**
+ * Package private constructor, only meant to be instantiated from
AsyncKuduScanner.
+ * @param elapsedMillis time in milliseconds since RPC creation to now
+ * @param tsUUID UUID of the tablet server that handled our request
+ * @param schema schema used to parse the rows
+ * @param numRows how many rows are contained in the bs slice
+ * @param data The raw columnar data corresponding to the primitive-typed
columns
+ * @param varlenData The variable-length data for the variable-length-typed
columns
+ * @param nonNullBitmaps The bitmaps corresponding to the non-null status of
the cells
+ * @param reuseRowResult reuse same row result for next row
+ */
+ ColumnarRowResultIterator(long elapsedMillis,
+ String tsUUID,
+ Schema schema,
+ int numRows,
+ Slice[] data,
+ Slice[] varlenData,
+ Slice[] nonNullBitmaps,
+ boolean reuseRowResult) {
+ super(elapsedMillis, tsUUID, schema, numRows, reuseRowResult);
+ this.data = data;
+ this.varlenData = varlenData;
+ this.nonNullBitmaps = nonNullBitmaps;
+ this.sharedRowResult = (reuseRowResult && numRows != 0) ?
+ new ColumnarRowResult(this.schema, data, varlenData,
nonNullBitmaps, -1) :
+ null;
+ }
+
+ static ColumnarRowResultIterator makeRowResultIterator(long elapsedMillis,
+ String tsUUID,
+ Schema schema,
+
WireProtocol.ColumnarRowBlockPB data,
+ final CallResponse
callResponse,
+ boolean
reuseRowResult)
+ throws KuduException {
+ if (data == null || data.getNumRows() == 0) {
+ return new ColumnarRowResultIterator(elapsedMillis, tsUUID, schema, 0,
+ null, null, null, reuseRowResult);
+ }
+
+ Slice[] dataSlices = new Slice[data.getColumnsCount()];
+ Slice[] varlenDataSlices = new Slice[data.getColumnsCount()];
+ Slice[] nonNullBitmapSlices = new Slice[data.getColumnsCount()];
+
+ for (int i = 0; i < data.getColumnsCount(); i++) {
+ WireProtocol.ColumnarRowBlockPB.Column column = data.getColumns(i);
+ dataSlices[i] = callResponse.getSidecar(column.getDataSidecar());
+ varlenDataSlices[i] =
callResponse.getSidecar(column.getVarlenDataSidecar());
+ nonNullBitmapSlices[i] =
callResponse.getSidecar(column.getNonNullBitmapSidecar());
+ }
+ int numRows = Math.toIntExact(data.getNumRows());
+
+ return new ColumnarRowResultIterator(elapsedMillis, tsUUID, schema,
numRows,
+ dataSlices, varlenDataSlices, nonNullBitmapSlices, reuseRowResult);
+ }
+
+ /**
+ * @return an empty row result iterator
+ */
+ public static ColumnarRowResultIterator empty() {
+ return EMPTY;
+ }
+
+ @Override
+ public RowResult next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ // If sharedRowResult is not null, we should reuse it for every next call.
+ if (sharedRowResult != null) {
+ this.sharedRowResult.advancePointerTo(this.currentRow++);
+ return sharedRowResult;
+ } else {
+ return new ColumnarRowResult(this.schema, this.data, this.varlenData,
this.nonNullBitmaps,
+ this.currentRow++);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "RowResultColumnarIterator for " + this.numRows + " rows";
+ }
+
+}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 8e5505c..6e16a7b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -60,6 +60,15 @@ public class KuduScanner implements Iterable<RowResult> {
}
/**
+ * Optionally set expected row data format.
+ *
+ * @param rowDataFormat Row data format to be expected.
+ */
+ public void setRowDataFormat(AsyncKuduScanner.RowDataFormat rowDataFormat) {
+ asyncScanner.setRowDataFormat(rowDataFormat);
+ }
+
+ /**
* Scans a number of rows.
* <p>
* Once this method returns {@code null} once (which indicates that this
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
index 6470e9d..b3e81cf 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResult.java
@@ -22,17 +22,14 @@ import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Arrays;
-import java.util.BitSet;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.kudu.ColumnSchema;
-import org.apache.kudu.ColumnTypeAttributes;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.util.DateUtil;
-import org.apache.kudu.util.Slice;
import org.apache.kudu.util.TimestampUtil;
/**
@@ -40,53 +37,22 @@ import org.apache.kudu.util.TimestampUtil;
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
-public class RowResult {
+public abstract class RowResult {
- private static final int INDEX_RESET_LOCATION = -1;
+ protected static final int INDEX_RESET_LOCATION = -1;
+ protected int index = INDEX_RESET_LOCATION;
- private final Schema schema;
- private final Slice indirectData;
- private final int rowSize;
- private final int[] columnOffsets;
-
- private Slice rowData;
- private int index = INDEX_RESET_LOCATION;
- private int offset;
- private BitSet nullsBitSet;
+ protected final Schema schema;
/**
* Prepares the row representation using the provided data. Doesn't copy data
* out of the byte arrays. Package private.
* @param schema Schema used to build the rowData
- * @param rowData The Slice of data returned by the tablet server
- * @param indirectData The full indirect data that contains the strings
* @param rowIndex The index of the row in the rowData that this RowResult
represents
*/
- RowResult(Schema schema, Slice rowData, Slice indirectData, int rowIndex) {
+ RowResult(Schema schema, int rowIndex) {
this.schema = schema;
- this.rowData = rowData;
- this.indirectData = indirectData;
- this.rowSize = this.schema.getRowSize();
- int columnOffsetsSize = schema.getColumnCount();
- if (schema.hasNullableColumns()) {
- columnOffsetsSize++;
- }
- columnOffsets = new int[columnOffsetsSize];
- // Empty projection, usually used for quick row counting.
- if (columnOffsetsSize == 0) {
- return;
- }
- int currentOffset = 0;
- columnOffsets[0] = currentOffset;
- // Pre-compute the columns offsets in rowData for easier lookups later.
- // If the schema has nullables, we also add the offset for the null bitmap
at the end.
- for (int i = 1; i < columnOffsetsSize; i++) {
- org.apache.kudu.ColumnSchema column = schema.getColumnByIndex(i - 1);
- int previousSize = column.getTypeSize();
- columnOffsets[i] = previousSize + currentOffset;
- currentOffset += previousSize;
- }
- advancePointerTo(rowIndex);
+ this.index = rowIndex;
}
void resetPointer() {
@@ -98,18 +64,6 @@ public class RowResult {
*/
void advancePointerTo(int rowIndex) {
this.index = rowIndex;
- this.offset = this.rowSize * this.index;
- if (schema.hasNullableColumns() && this.index != INDEX_RESET_LOCATION) {
- this.nullsBitSet = Bytes.toBitSet(
- this.rowData.getRawArray(),
- this.rowData.getRawOffset() +
- getCurrentRowDataOffsetForColumn(schema.getColumnCount()),
- schema.getColumnCount());
- }
- }
-
- int getCurrentRowDataOffsetForColumn(int columnIndex) {
- return this.offset + this.columnOffsets[columnIndex];
}
/**
@@ -119,7 +73,7 @@ public class RowResult {
* @throws IllegalArgumentException if the column doesn't exist, is null,
* or if the type doesn't match the column's type
*/
- public int getInt(String columnName) {
+ public final int getInt(String columnName) {
return getInt(this.schema.getColumnIndex(columnName));
}
@@ -131,13 +85,7 @@ public class RowResult {
* or if the type doesn't match the column's type
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public int getInt(int columnIndex) {
- checkValidColumn(columnIndex);
- checkNull(columnIndex);
- checkType(columnIndex, Type.INT32, Type.DATE);
- return Bytes.getInt(this.rowData.getRawArray(),
- this.rowData.getRawOffset() +
getCurrentRowDataOffsetForColumn(columnIndex));
- }
+ public abstract int getInt(int columnIndex);
/**
* Get the specified column's short
@@ -146,7 +94,7 @@ public class RowResult {
* @throws IllegalArgumentException if the column doesn't exist, is null,
* or if the type doesn't match the column's type
*/
- public short getShort(String columnName) {
+ public final short getShort(String columnName) {
return getShort(this.schema.getColumnIndex(columnName));
}
@@ -158,13 +106,7 @@ public class RowResult {
* or if the type doesn't match the column's type
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public short getShort(int columnIndex) {
- checkValidColumn(columnIndex);
- checkNull(columnIndex);
- checkType(columnIndex, Type.INT16);
- return Bytes.getShort(this.rowData.getRawArray(),
- this.rowData.getRawOffset() +
getCurrentRowDataOffsetForColumn(columnIndex));
- }
+ public abstract short getShort(int columnIndex);
/**
* Get the specified column's boolean
@@ -173,7 +115,7 @@ public class RowResult {
* @throws IllegalArgumentException if the column doesn't exist, is null,
* or if the type doesn't match the column's type
*/
- public boolean getBoolean(String columnName) {
+ public final boolean getBoolean(String columnName) {
return getBoolean(this.schema.getColumnIndex(columnName));
}
@@ -185,15 +127,7 @@ public class RowResult {
* or if the type doesn't match the column's type
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public boolean getBoolean(int columnIndex) {
- checkValidColumn(columnIndex);
- checkNull(columnIndex);
- checkType(columnIndex, Type.BOOL);
- byte b = Bytes.getByte(this.rowData.getRawArray(),
- this.rowData.getRawOffset() +
- getCurrentRowDataOffsetForColumn(columnIndex));
- return b == 1;
- }
+ public abstract boolean getBoolean(int columnIndex);
/**
* Get the specified column's byte
@@ -202,7 +136,7 @@ public class RowResult {
* @throws IllegalArgumentException if the column doesn't exist, is null,
* or if the type doesn't match the column's type
*/
- public byte getByte(String columnName) {
+ public final byte getByte(String columnName) {
return getByte(this.schema.getColumnIndex(columnName));
}
@@ -215,13 +149,7 @@ public class RowResult {
* or if the type doesn't match the column's type
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public byte getByte(int columnIndex) {
- checkValidColumn(columnIndex);
- checkNull(columnIndex);
- checkType(columnIndex, Type.INT8);
- return Bytes.getByte(this.rowData.getRawArray(),
- this.rowData.getRawOffset() +
getCurrentRowDataOffsetForColumn(columnIndex));
- }
+ public abstract byte getByte(int columnIndex);
/**
* Get the specified column's long
@@ -233,7 +161,7 @@ public class RowResult {
* @return a positive long
* @throws IllegalArgumentException if the column doesn't exist or is null
*/
- public long getLong(String columnName) {
+ public final long getLong(String columnName) {
return getLong(this.schema.getColumnIndex(columnName));
}
@@ -248,12 +176,7 @@ public class RowResult {
* @throws IllegalArgumentException if the column is null
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public long getLong(int columnIndex) {
- checkValidColumn(columnIndex);
- checkNull(columnIndex);
- checkType(columnIndex, Type.INT64, Type.UNIXTIME_MICROS);
- return getLongOrOffset(columnIndex);
- }
+ public abstract long getLong(int columnIndex);
/**
* Get the specified column's float
@@ -262,7 +185,7 @@ public class RowResult {
* @throws IllegalArgumentException if the column doesn't exist, is null,
* or if the type doesn't match the column's type
*/
- public float getFloat(String columnName) {
+ public final float getFloat(String columnName) {
return getFloat(this.schema.getColumnIndex(columnName));
}
@@ -274,14 +197,7 @@ public class RowResult {
* or if the type doesn't match the column's type
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public float getFloat(int columnIndex) {
- checkValidColumn(columnIndex);
- checkNull(columnIndex);
- checkType(columnIndex, Type.FLOAT);
- return Bytes.getFloat(this.rowData.getRawArray(),
- this.rowData.getRawOffset() +
- getCurrentRowDataOffsetForColumn(columnIndex));
- }
+ public abstract float getFloat(int columnIndex);
/**
* Get the specified column's double
@@ -290,7 +206,7 @@ public class RowResult {
* @throws IllegalArgumentException if the column doesn't exist, is null,
* or if the type doesn't match the column's type
*/
- public double getDouble(String columnName) {
+ public final double getDouble(String columnName) {
return getDouble(this.schema.getColumnIndex(columnName));
}
@@ -303,14 +219,7 @@ public class RowResult {
* or if the type doesn't match the column's type
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public double getDouble(int columnIndex) {
- checkValidColumn(columnIndex);
- checkNull(columnIndex);
- checkType(columnIndex, Type.DOUBLE);
- return Bytes.getDouble(this.rowData.getRawArray(),
- this.rowData.getRawOffset() +
- getCurrentRowDataOffsetForColumn(columnIndex));
- }
+ public abstract double getDouble(int columnIndex);
/**
* Get the specified column's Decimal.
@@ -319,7 +228,7 @@ public class RowResult {
* @return a BigDecimal
* @throws IllegalArgumentException if the column doesn't exist or is null
*/
- public BigDecimal getDecimal(String columnName) {
+ public final BigDecimal getDecimal(String columnName) {
return getDecimal(this.schema.getColumnIndex(columnName));
}
@@ -331,16 +240,7 @@ public class RowResult {
* @throws IllegalArgumentException if the column is null
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public BigDecimal getDecimal(int columnIndex) {
- checkValidColumn(columnIndex);
- checkNull(columnIndex);
- checkType(columnIndex, Type.DECIMAL);
- ColumnSchema column = schema.getColumnByIndex(columnIndex);
- ColumnTypeAttributes typeAttributes = column.getTypeAttributes();
- return Bytes.getDecimal(this.rowData.getRawArray(),
- this.rowData.getRawOffset() +
getCurrentRowDataOffsetForColumn(columnIndex),
- typeAttributes.getPrecision(), typeAttributes.getScale());
- }
+ public abstract BigDecimal getDecimal(int columnIndex);
/**
* Get the specified column's Timestamp.
@@ -350,7 +250,7 @@ public class RowResult {
* @throws IllegalArgumentException if the column doesn't exist,
* is null, is unset, or the type doesn't match the column's type
*/
- public Timestamp getTimestamp(String columnName) {
+ public final Timestamp getTimestamp(String columnName) {
return getTimestamp(this.schema.getColumnIndex(columnName));
}
@@ -363,13 +263,7 @@ public class RowResult {
* or if the type doesn't match the column's type
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public Timestamp getTimestamp(int columnIndex) {
- checkValidColumn(columnIndex);
- checkNull(columnIndex);
- checkType(columnIndex, Type.UNIXTIME_MICROS);
- long micros = getLongOrOffset(columnIndex);
- return TimestampUtil.microsToTimestamp(micros);
- }
+ public abstract Timestamp getTimestamp(int columnIndex);
/**
* Get the specified column's Date.
@@ -379,7 +273,7 @@ public class RowResult {
* @throws IllegalArgumentException if the column doesn't exist,
* is null, is unset, or the type doesn't match the column's type
*/
- public Date getDate(String columnName) {
+ public final Date getDate(String columnName) {
return getDate(this.schema.getColumnIndex(columnName));
}
@@ -392,7 +286,7 @@ public class RowResult {
* or if the type doesn't match the column's type
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public Date getDate(int columnIndex) {
+ public final Date getDate(int columnIndex) {
checkValidColumn(columnIndex);
checkNull(columnIndex);
checkType(columnIndex, Type.DATE);
@@ -404,7 +298,7 @@ public class RowResult {
* Get the schema used for this scanner's column projection.
* @return a column projection as a schema.
*/
- public Schema getColumnProjection() {
+ public final Schema getColumnProjection() {
return this.schema;
}
@@ -415,7 +309,7 @@ public class RowResult {
* @throws IllegalArgumentException if the column doesn't exist, is null,
* or if the type doesn't match the column's type
*/
- public String getString(String columnName) {
+ public final String getString(String columnName) {
return getString(this.schema.getColumnIndex(columnName));
}
@@ -428,24 +322,12 @@ public class RowResult {
* or if the type doesn't match the column's type
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public String getString(int columnIndex) {
+ public final String getString(int columnIndex) {
checkType(columnIndex, Type.STRING);
return getVarLengthData(columnIndex);
}
- private String getVarLengthData(int columnIndex) {
- checkValidColumn(columnIndex);
- checkNull(columnIndex);
- checkType(columnIndex, Type.STRING, Type.VARCHAR);
- // C++ puts a Slice in rowData which is 16 bytes long for simplicity, but
we only support ints.
- long offset = getLongOrOffset(columnIndex);
- long length =
rowData.getLong(getCurrentRowDataOffsetForColumn(columnIndex) + 8);
- assert offset < Integer.MAX_VALUE;
- assert length < Integer.MAX_VALUE;
- return Bytes.getString(indirectData.getRawArray(),
- indirectData.getRawOffset() + (int)offset,
- (int)length);
- }
+ protected abstract String getVarLengthData(int columnIndex);
/**
* Get the specified column's varchar.
@@ -455,7 +337,7 @@ public class RowResult {
* or if the type doesn't match the column's type
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public String getVarchar(int columnIndex) {
+ public final String getVarchar(int columnIndex) {
checkType(columnIndex, Type.VARCHAR);
return getVarLengthData(columnIndex);
}
@@ -467,7 +349,7 @@ public class RowResult {
* @throws IllegalArgumentException if the column doesn't exist, is null,
* or if the type doesn't match the column's type
*/
- public String getVarchar(String columnName) {
+ public final String getVarchar(String columnName) {
return getVarchar(this.schema.getColumnIndex(columnName));
}
@@ -479,7 +361,7 @@ public class RowResult {
* or if the type doesn't match the column's type
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public byte[] getBinaryCopy(String columnName) {
+ public final byte[] getBinaryCopy(String columnName) {
return getBinaryCopy(this.schema.getColumnIndex(columnName));
}
@@ -492,20 +374,7 @@ public class RowResult {
* or if the type doesn't match the column's type
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public byte[] getBinaryCopy(int columnIndex) {
- checkValidColumn(columnIndex);
- checkNull(columnIndex);
- // C++ puts a Slice in rowData which is 16 bytes long for simplicity,
- // but we only support ints.
- long offset = getLongOrOffset(columnIndex);
- long length =
rowData.getLong(getCurrentRowDataOffsetForColumn(columnIndex) + 8);
- assert offset < Integer.MAX_VALUE;
- assert length < Integer.MAX_VALUE;
- byte[] ret = new byte[(int)length];
- System.arraycopy(indirectData.getRawArray(), indirectData.getRawOffset() +
(int) offset,
- ret, 0, (int) length);
- return ret;
- }
+ public abstract byte[] getBinaryCopy(int columnIndex);
/**
* Get the specified column's binary data.
@@ -518,7 +387,7 @@ public class RowResult {
* or if the type doesn't match the column's type
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public ByteBuffer getBinary(String columnName) {
+ public final ByteBuffer getBinary(String columnName) {
return getBinary(this.schema.getColumnIndex(columnName));
}
@@ -533,31 +402,7 @@ public class RowResult {
* or if the type doesn't match the column's type
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public ByteBuffer getBinary(int columnIndex) {
- checkValidColumn(columnIndex);
- checkNull(columnIndex);
- checkType(columnIndex, Type.BINARY);
- // C++ puts a Slice in rowData which is 16 bytes long for simplicity,
- // but we only support ints.
- long offset = getLongOrOffset(columnIndex);
- long length =
rowData.getLong(getCurrentRowDataOffsetForColumn(columnIndex) + 8);
- assert offset < Integer.MAX_VALUE;
- assert length < Integer.MAX_VALUE;
- return ByteBuffer.wrap(indirectData.getRawArray(),
indirectData.getRawOffset() + (int) offset,
- (int) length);
- }
-
- /**
- * Returns the long column value if the column type is INT64 or
UNIXTIME_MICROS.
- * Returns the column's offset into the indirectData if the column type is
BINARY or STRING.
- * @param columnIndex Column index in the schema
- * @return a long value for the column
- */
- long getLongOrOffset(int columnIndex) {
- return Bytes.getLong(this.rowData.getRawArray(),
- this.rowData.getRawOffset() +
- getCurrentRowDataOffsetForColumn(columnIndex));
- }
+ public abstract ByteBuffer getBinary(int columnIndex);
/**
* Get if the specified column is NULL
@@ -566,7 +411,7 @@ public class RowResult {
* false otherwise
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public boolean isNull(String columnName) {
+ public final boolean isNull(String columnName) {
return isNull(this.schema.getColumnIndex(columnName));
}
@@ -577,14 +422,7 @@ public class RowResult {
* false otherwise
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public boolean isNull(int columnIndex) {
- checkValidColumn(columnIndex);
- if (nullsBitSet == null) {
- return false;
- }
- return schema.getColumnByIndex(columnIndex).isNullable() &&
- nullsBitSet.get(columnIndex);
- }
+ public abstract boolean isNull(int columnIndex);
/**
* Get the specified column's value as an Object.
@@ -610,7 +448,7 @@ public class RowResult {
* @return the column's value as an Object, null if the value is null
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public Object getObject(String columnName) {
+ public final Object getObject(String columnName) {
return getObject(this.schema.getColumnIndex(columnName));
}
@@ -639,7 +477,7 @@ public class RowResult {
* @return the column's value as an Object, null if the value is null
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public Object getObject(int columnIndex) {
+ public final Object getObject(int columnIndex) {
checkValidColumn(columnIndex);
if (isNull(columnIndex)) {
return null;
@@ -668,7 +506,7 @@ public class RowResult {
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
- public boolean hasIsDeleted() {
+ public final boolean hasIsDeleted() {
return schema.hasIsDeleted();
}
@@ -678,7 +516,7 @@ public class RowResult {
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
- public boolean isDeleted() {
+ public final boolean isDeleted() {
return getBoolean(schema.getIsDeletedIndex());
}
@@ -687,7 +525,7 @@ public class RowResult {
* @param columnName name of the column
* @return a type
*/
- public Type getColumnType(String columnName) {
+ public final Type getColumnType(String columnName) {
return this.schema.getColumn(columnName).getType();
}
@@ -697,7 +535,7 @@ public class RowResult {
* @return a type
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- public Type getColumnType(int columnIndex) {
+ public final Type getColumnType(int columnIndex) {
return this.schema.getColumnByIndex(columnIndex).getType();
}
@@ -705,14 +543,14 @@ public class RowResult {
* Get the schema associated with this result.
* @return a schema
*/
- public Schema getSchema() {
+ public final Schema getSchema() {
return schema;
}
/**
* @throws IndexOutOfBoundsException if the column doesn't exist
*/
- private void checkValidColumn(int columnIndex) {
+ protected final void checkValidColumn(int columnIndex) {
if (columnIndex >= schema.getColumnCount()) {
throw new IndexOutOfBoundsException("Requested column is out of range, "
+
columnIndex + " out of " + schema.getColumnCount());
@@ -722,7 +560,7 @@ public class RowResult {
/**
* @throws IllegalArgumentException if the column is null
*/
- private void checkNull(int columnIndex) {
+ protected final void checkNull(int columnIndex) {
if (!schema.hasNullableColumns()) {
return;
}
@@ -733,7 +571,7 @@ public class RowResult {
}
}
- private void checkType(int columnIndex, Type... types) {
+ protected final void checkType(int columnIndex, Type... types) {
ColumnSchema columnSchema = schema.getColumnByIndex(columnIndex);
Type columnType = columnSchema.getType();
for (Type type : types) {
@@ -746,11 +584,6 @@ public class RowResult {
columnType.getName() + " but was requested as a type " +
Arrays.toString(types));
}
- @Override
- public String toString() {
- return "RowResult index: " + this.index + ", size: " + this.rowSize;
- }
-
/**
* Return the actual data from this row in a stringified key=value
* form.
@@ -826,7 +659,7 @@ public class RowResult {
* the iterator as well as its data.
*/
public String toStringLongFormat() {
- StringBuilder buf = new StringBuilder(this.rowSize); // super rough
estimation.
+ StringBuilder buf = new StringBuilder();
buf.append(this.toString());
buf.append("{");
buf.append(rowToString());
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
index b869988..d9abc30 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
@@ -18,14 +18,11 @@
package org.apache.kudu.client;
import java.util.Iterator;
-import java.util.NoSuchElementException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.kudu.Schema;
-import org.apache.kudu.WireProtocol;
-import org.apache.kudu.util.Slice;
/**
* Class that contains the rows sent by a tablet server, exhausting this
iterator only means
@@ -34,18 +31,12 @@ import org.apache.kudu.util.Slice;
@InterfaceAudience.Public
@InterfaceStability.Evolving
@SuppressWarnings("IterableAndIterator")
-public class RowResultIterator extends KuduRpcResponse implements
Iterator<RowResult>,
+public abstract class RowResultIterator extends KuduRpcResponse implements
Iterator<RowResult>,
Iterable<RowResult> {
- private static final RowResultIterator EMPTY =
- new RowResultIterator(0, null, null, 0, null, null, false);
-
- private final Schema schema;
- private final Slice bs;
- private final Slice indirectBs;
- private final int numRows;
- private int currentRow = 0;
- private final RowResult sharedRowResult;
+ protected final Schema schema;
+ protected final int numRows;
+ protected int currentRow = 0;
/**
* Package private constructor, only meant to be instantiated from
AsyncKuduScanner.
@@ -53,57 +44,23 @@ public class RowResultIterator extends KuduRpcResponse
implements Iterator<RowRe
* @param tsUUID UUID of the tablet server that handled our request
* @param schema schema used to parse the rows
* @param numRows how many rows are contained in the bs slice
- * @param bs normal row data
- * @param indirectBs indirect row data
*/
- private RowResultIterator(long elapsedMillis,
+ protected RowResultIterator(long elapsedMillis,
String tsUUID,
Schema schema,
int numRows,
- Slice bs,
- Slice indirectBs,
boolean reuseRowResult) {
super(elapsedMillis, tsUUID);
this.schema = schema;
this.numRows = numRows;
- this.bs = bs;
- this.indirectBs = indirectBs;
- this.sharedRowResult = (reuseRowResult && numRows != 0) ?
- new RowResult(this.schema, this.bs, this.indirectBs, -1) : null;
}
- static RowResultIterator makeRowResultIterator(long elapsedMillis,
- String tsUUID,
- Schema schema,
-
WireProtocol.RowwiseRowBlockPB data,
- final CallResponse
callResponse,
- boolean reuseRowResult)
- throws KuduException {
- if (data == null || data.getNumRows() == 0) {
- return new RowResultIterator(elapsedMillis, tsUUID, schema, 0, null,
null, reuseRowResult);
- }
-
- Slice bs = callResponse.getSidecar(data.getRowsSidecar());
- Slice indirectBs = callResponse.getSidecar(data.getIndirectDataSidecar());
- int numRows = data.getNumRows();
-
- // Integrity check
- int rowSize = schema.getRowSize();
- int expectedSize = numRows * rowSize;
- if (expectedSize != bs.length()) {
- Status statusIllegalState = Status.IllegalState("RowResult block has " +
bs.length() +
- " bytes of data but expected " + expectedSize + " for " + numRows +
" rows");
- throw new NonRecoverableException(statusIllegalState);
- }
- return new RowResultIterator(elapsedMillis, tsUUID, schema, numRows, bs,
indirectBs,
- reuseRowResult);
+ public int getNumRows() {
+ return this.numRows;
}
- /**
- * @return an empty row result iterator
- */
public static RowResultIterator empty() {
- return EMPTY;
+ return RowwiseRowResultIterator.empty();
}
@Override
@@ -112,38 +69,10 @@ public class RowResultIterator extends KuduRpcResponse
implements Iterator<RowRe
}
@Override
- public RowResult next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- // If sharedRowResult is not null, we should reuse it for every next call.
- if (sharedRowResult != null) {
- this.sharedRowResult.advancePointerTo(this.currentRow++);
- return sharedRowResult;
- } else {
- return new RowResult(this.schema, this.bs, this.indirectBs,
this.currentRow++);
- }
- }
-
- @Override
public void remove() {
throw new UnsupportedOperationException();
}
- /**
- * Get the number of rows in this iterator. If all you want is to count
- * rows, call this and skip the rest.
- * @return number of rows in this iterator
- */
- public int getNumRows() {
- return this.numRows;
- }
-
- @Override
- public String toString() {
- return "RowResultIterator for " + this.numRows + " rows";
- }
-
@Override
public Iterator<RowResult> iterator() {
return this;
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/RowwiseRowResult.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/RowwiseRowResult.java
new file mode 100644
index 0000000..aa54fd3
--- /dev/null
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/RowwiseRowResult.java
@@ -0,0 +1,372 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.BitSet;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.kudu.ColumnSchema;
+import org.apache.kudu.ColumnTypeAttributes;
+import org.apache.kudu.Schema;
+import org.apache.kudu.Type;
+import org.apache.kudu.util.Slice;
+import org.apache.kudu.util.TimestampUtil;
+
+/**
+ * RowResult represents one row from a scanner, in row-wise layout.
+ */
[email protected]
[email protected]
+class RowwiseRowResult extends RowResult {
+
+ private final Slice rowData;
+ private final Slice indirectData;
+
+ private final int rowSize;
+ private final int[] columnOffsets;
+
+ private int offset;
+ private BitSet nullsBitSet;
+
+ /**
+ * Prepares the row representation using the provided data. Doesn't copy data
+ * out of the byte arrays. Package private.
+ * @param schema Schema used to build the rowData
+ * @param rowData The Slice of data returned by the tablet server
+ * @param indirectData The full indirect data that contains the strings
+ * @param rowIndex The index of the row in the rowData that this RowResult
represents
+ */
+ RowwiseRowResult(Schema schema, Slice rowData, Slice indirectData, int
rowIndex) {
+ super(schema, rowIndex);
+ this.rowData = rowData;
+ this.indirectData = indirectData;
+ this.rowSize = this.schema.getRowSize();
+
+ int columnOffsetsSize = schema.getColumnCount();
+ if (schema.hasNullableColumns()) {
+ columnOffsetsSize++;
+ }
+ columnOffsets = new int[columnOffsetsSize];
+ // Empty projection, usually used for quick row counting.
+ if (columnOffsetsSize == 0) {
+ return;
+ }
+ int currentOffset = 0;
+ columnOffsets[0] = currentOffset;
+ // Pre-compute the columns offsets in rowData for easier lookups later.
+ // If the schema has nullables, we also add the offset for the null bitmap
at the end.
+ for (int i = 1; i < columnOffsetsSize; i++) {
+ org.apache.kudu.ColumnSchema column = schema.getColumnByIndex(i - 1);
+ int previousSize = column.getTypeSize();
+ columnOffsets[i] = previousSize + currentOffset;
+ currentOffset += previousSize;
+ }
+ advancePointerTo(rowIndex);
+ }
+
+ /**
+ * Package-protected, only meant to be used by the RowResultIterator
+ */
+ @Override
+ void advancePointerTo(int rowIndex) {
+ super.advancePointerTo(rowIndex);
+
+ this.offset = this.rowSize * this.index;
+ if (schema.hasNullableColumns() && this.index != INDEX_RESET_LOCATION) {
+ this.nullsBitSet = Bytes.toBitSet(
+ this.rowData.getRawArray(),
+ this.rowData.getRawOffset() +
+ getCurrentRowDataOffsetForColumn(schema.getColumnCount()),
+ schema.getColumnCount());
+ }
+ }
+
+ int getCurrentRowDataOffsetForColumn(int columnIndex) {
+ return this.offset + this.columnOffsets[columnIndex];
+ }
+
+ /**
+ * Get the specified column's integer
+ * @param columnIndex Column index in the schema
+ * @return an integer
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public int getInt(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.INT32, Type.DATE);
+ return Bytes.getInt(this.rowData.getRawArray(),
+ this.rowData.getRawOffset() +
getCurrentRowDataOffsetForColumn(columnIndex));
+ }
+
+ /**
+ * Get the specified column's short
+ * @param columnIndex Column index in the schema
+ * @return a short
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public short getShort(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.INT16);
+ return Bytes.getShort(this.rowData.getRawArray(),
+ this.rowData.getRawOffset() +
getCurrentRowDataOffsetForColumn(columnIndex));
+ }
+
+ /**
+ * Get the specified column's boolean
+ * @param columnIndex Column index in the schema
+ * @return a boolean
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public boolean getBoolean(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.BOOL);
+ byte b = Bytes.getByte(this.rowData.getRawArray(),
+ this.rowData.getRawOffset() +
+ getCurrentRowDataOffsetForColumn(columnIndex));
+ return b == 1;
+ }
+
+ /**
+ * Get the specified column's byte
+ * @param columnIndex Column index in the schema
+ * @return a byte
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public byte getByte(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.INT8);
+ return Bytes.getByte(this.rowData.getRawArray(),
+ this.rowData.getRawOffset() +
getCurrentRowDataOffsetForColumn(columnIndex));
+ }
+
+ /**
+ * Get the specified column's long
+ *
+ * If this is a UNIXTIME_MICROS column, the long value corresponds to a
number of microseconds
+ * since midnight, January 1, 1970 UTC.
+ *
+ * @param columnIndex Column index in the schema
+ * @return a positive long
+ * @throws IllegalArgumentException if the column is null
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public long getLong(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.INT64, Type.UNIXTIME_MICROS);
+ return Bytes.getLong(this.rowData.getRawArray(),
+ this.rowData.getRawOffset() +
getCurrentRowDataOffsetForColumn(columnIndex));
+ }
+
+ /**
+ * Get the specified column's float
+ * @param columnIndex Column index in the schema
+ * @return a float
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public float getFloat(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.FLOAT);
+ return Bytes.getFloat(this.rowData.getRawArray(),
+ this.rowData.getRawOffset() +
+ getCurrentRowDataOffsetForColumn(columnIndex));
+ }
+
+ /**
+ * Get the specified column's double
+ * @param columnIndex Column index in the schema
+ * @return a double
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public double getDouble(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.DOUBLE);
+ return Bytes.getDouble(this.rowData.getRawArray(),
+ this.rowData.getRawOffset() +
+ getCurrentRowDataOffsetForColumn(columnIndex));
+ }
+
+ /**
+ * Get the specified column's Decimal.
+ *
+ * @param columnIndex Column index in the schema
+ * @return a BigDecimal.
+ * @throws IllegalArgumentException if the column is null
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public BigDecimal getDecimal(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.DECIMAL);
+ ColumnSchema column = schema.getColumnByIndex(columnIndex);
+ ColumnTypeAttributes typeAttributes = column.getTypeAttributes();
+ return Bytes.getDecimal(this.rowData.getRawArray(),
+ this.rowData.getRawOffset() +
getCurrentRowDataOffsetForColumn(columnIndex),
+ typeAttributes.getPrecision(), typeAttributes.getScale());
+ }
+
+ /**
+ * Get the specified column's Timestamp.
+ *
+ * @param columnIndex Column index in the schema
+ * @return a Timestamp
+ * @throws IllegalArgumentException if the column is null, is unset,
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public Timestamp getTimestamp(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.UNIXTIME_MICROS);
+ long micros = getLong(columnIndex);
+ return TimestampUtil.microsToTimestamp(micros);
+ }
+
+ @Override
+ public String getVarLengthData(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.STRING, Type.VARCHAR);
+ // C++ puts a Slice in rowData which is 16 bytes long for simplicity, but
we only support ints.
+ long offset = getOffset(columnIndex);
+ long length =
rowData.getLong(getCurrentRowDataOffsetForColumn(columnIndex) + 8);
+ assert offset < Integer.MAX_VALUE;
+ assert length < Integer.MAX_VALUE;
+ return Bytes.getString(indirectData.getRawArray(),
+ indirectData.getRawOffset() + (int)offset,
+ (int)length);
+ }
+
+ /**
+ * Get a copy of the specified column's binary data.
+ * @param columnIndex Column index in the schema
+ * @return a byte[] with the binary data.
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public byte[] getBinaryCopy(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ // C++ puts a Slice in rowData which is 16 bytes long for simplicity,
+ // but we only support ints.
+ long offset = getOffset(columnIndex);
+ long length =
rowData.getLong(getCurrentRowDataOffsetForColumn(columnIndex) + 8);
+ assert offset < Integer.MAX_VALUE;
+ assert length < Integer.MAX_VALUE;
+ byte[] ret = new byte[(int)length];
+ System.arraycopy(indirectData.getRawArray(), indirectData.getRawOffset() +
(int) offset,
+ ret, 0, (int) length);
+ return ret;
+ }
+
+ /**
+ * Get the specified column's binary data.
+ *
+ * This doesn't copy the data and instead returns a ByteBuffer that wraps it.
+ *
+ * @param columnIndex Column index in the schema
+ * @return a ByteBuffer with the binary data.
+ * @throws IllegalArgumentException if the column is null
+ * or if the type doesn't match the column's type
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public ByteBuffer getBinary(int columnIndex) {
+ checkValidColumn(columnIndex);
+ checkNull(columnIndex);
+ checkType(columnIndex, Type.BINARY);
+ // C++ puts a Slice in rowData which is 16 bytes long for simplicity,
+ // but we only support ints.
+ long offset = getOffset(columnIndex);
+ long length =
rowData.getLong(getCurrentRowDataOffsetForColumn(columnIndex) + 8);
+ assert offset < Integer.MAX_VALUE;
+ assert length < Integer.MAX_VALUE;
+ return ByteBuffer.wrap(indirectData.getRawArray(),
indirectData.getRawOffset() + (int) offset,
+ (int) length);
+ }
+
+ /**
+ * Returns the long column value if the column type is INT64 or
UNIXTIME_MICROS.
+ * Returns the column's offset into the indirectData if the column type is
BINARY or STRING.
+ * @param columnIndex Column index in the schema
+ * @return a long value for the column
+ */
+ long getOffset(int columnIndex) {
+ return Bytes.getLong(this.rowData.getRawArray(),
+ this.rowData.getRawOffset() +
+ getCurrentRowDataOffsetForColumn(columnIndex));
+ }
+
+ /**
+ * Get if the specified column is NULL
+ * @param columnIndex Column index in the schema
+ * @return true if the column cell is null and the column is nullable,
+ * false otherwise
+ * @throws IndexOutOfBoundsException if the column doesn't exist
+ */
+ @Override
+ public boolean isNull(int columnIndex) {
+ checkValidColumn(columnIndex);
+ if (nullsBitSet == null) {
+ return false;
+ }
+ return schema.getColumnByIndex(columnIndex).isNullable() &&
+ nullsBitSet.get(columnIndex);
+ }
+
+
+ @Override
+ public String toString() {
+ return "RowResult(Rowwise) index: " + this.index + ", size: " +
this.rowSize;
+ }
+
+}
diff --git
a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
b/java/kudu-client/src/main/java/org/apache/kudu/client/RowwiseRowResultIterator.java
similarity index 55%
copy from
java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
copy to
java/kudu-client/src/main/java/org/apache/kudu/client/RowwiseRowResultIterator.java
index b869988..9c6c440 100644
---
a/java/kudu-client/src/main/java/org/apache/kudu/client/RowResultIterator.java
+++
b/java/kudu-client/src/main/java/org/apache/kudu/client/RowwiseRowResultIterator.java
@@ -17,7 +17,6 @@
package org.apache.kudu.client;
-import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.yetus.audience.InterfaceAudience;
@@ -28,23 +27,19 @@ import org.apache.kudu.WireProtocol;
import org.apache.kudu.util.Slice;
/**
- * Class that contains the rows sent by a tablet server, exhausting this
iterator only means
- * that all the rows from the last server response were read.
+ * Class that contains the rows in row-wise layout sent by a tablet server,
+ * exhausting this iterator only means that all the rows from the last server
response were read.
*/
[email protected]
[email protected]
@InterfaceStability.Evolving
@SuppressWarnings("IterableAndIterator")
-public class RowResultIterator extends KuduRpcResponse implements
Iterator<RowResult>,
- Iterable<RowResult> {
+class RowwiseRowResultIterator extends RowResultIterator {
- private static final RowResultIterator EMPTY =
- new RowResultIterator(0, null, null, 0, null, null, false);
+ private static final RowwiseRowResultIterator EMPTY =
+ new RowwiseRowResultIterator(0, null, null, 0, null, null, false);
- private final Schema schema;
private final Slice bs;
private final Slice indirectBs;
- private final int numRows;
- private int currentRow = 0;
private final RowResult sharedRowResult;
/**
@@ -55,32 +50,34 @@ public class RowResultIterator extends KuduRpcResponse
implements Iterator<RowRe
* @param numRows how many rows are contained in the bs slice
* @param bs normal row data
* @param indirectBs indirect row data
+ * @param reuseRowResult reuse same row result for next row
*/
- private RowResultIterator(long elapsedMillis,
- String tsUUID,
- Schema schema,
- int numRows,
- Slice bs,
- Slice indirectBs,
- boolean reuseRowResult) {
- super(elapsedMillis, tsUUID);
- this.schema = schema;
- this.numRows = numRows;
+ RowwiseRowResultIterator(long elapsedMillis,
+ String tsUUID,
+ Schema schema,
+ int numRows,
+ Slice bs,
+ Slice indirectBs,
+ boolean reuseRowResult) {
+ super(elapsedMillis, tsUUID, schema, numRows, reuseRowResult);
this.bs = bs;
this.indirectBs = indirectBs;
+
this.sharedRowResult = (reuseRowResult && numRows != 0) ?
- new RowResult(this.schema, this.bs, this.indirectBs, -1) : null;
+ new RowwiseRowResult(this.schema, this.bs, this.indirectBs, -1) :
null;
+
}
- static RowResultIterator makeRowResultIterator(long elapsedMillis,
- String tsUUID,
- Schema schema,
-
WireProtocol.RowwiseRowBlockPB data,
- final CallResponse
callResponse,
- boolean reuseRowResult)
+ static RowwiseRowResultIterator makeRowResultIterator(long elapsedMillis,
+ String tsUUID,
+ Schema schema,
+
WireProtocol.RowwiseRowBlockPB data,
+ final CallResponse
callResponse,
+ boolean reuseRowResult)
throws KuduException {
if (data == null || data.getNumRows() == 0) {
- return new RowResultIterator(elapsedMillis, tsUUID, schema, 0, null,
null, reuseRowResult);
+ return new RowwiseRowResultIterator(elapsedMillis, tsUUID, schema, 0,
+ null, null, reuseRowResult);
}
Slice bs = callResponse.getSidecar(data.getRowsSidecar());
@@ -95,23 +92,18 @@ public class RowResultIterator extends KuduRpcResponse
implements Iterator<RowRe
" bytes of data but expected " + expectedSize + " for " + numRows +
" rows");
throw new NonRecoverableException(statusIllegalState);
}
- return new RowResultIterator(elapsedMillis, tsUUID, schema, numRows, bs,
indirectBs,
- reuseRowResult);
+ return new RowwiseRowResultIterator(elapsedMillis, tsUUID, schema, numRows,
+ bs, indirectBs, reuseRowResult);
}
/**
* @return an empty row result iterator
*/
- public static RowResultIterator empty() {
+ public static RowwiseRowResultIterator empty() {
return EMPTY;
}
@Override
- public boolean hasNext() {
- return this.currentRow < numRows;
- }
-
- @Override
public RowResult next() {
if (!hasNext()) {
throw new NoSuchElementException();
@@ -121,31 +113,12 @@ public class RowResultIterator extends KuduRpcResponse
implements Iterator<RowRe
this.sharedRowResult.advancePointerTo(this.currentRow++);
return sharedRowResult;
} else {
- return new RowResult(this.schema, this.bs, this.indirectBs,
this.currentRow++);
+ return new RowwiseRowResult(this.schema, this.bs, this.indirectBs,
this.currentRow++);
}
}
@Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Get the number of rows in this iterator. If all you want is to count
- * rows, call this and skip the rest.
- * @return number of rows in this iterator
- */
- public int getNumRows() {
- return this.numRows;
- }
-
- @Override
public String toString() {
- return "RowResultIterator for " + this.numRows + " rows";
- }
-
- @Override
- public Iterator<RowResult> iterator() {
- return this;
+ return "RowwiseRowResultIterator for " + this.numRows + " rows";
}
}
diff --git
a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
index 8b8bf0c..2624cc3 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRowResult.java
@@ -27,7 +27,6 @@ import static org.junit.Assert.assertTrue;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
-import java.sql.Date;
import java.sql.Timestamp;
import org.junit.Before;
@@ -47,6 +46,9 @@ public class TestRowResult {
private static final Schema allTypesSchema = getSchemaWithAllTypes();
+ // insert 5 rows to test result iterations
+ private static final int TEST_ROWS = 5;
+
private KuduTable table;
@Rule
@@ -56,128 +58,164 @@ public class TestRowResult {
public void setUp() throws Exception {
harness.getClient().createTable(TABLE_NAME, allTypesSchema,
getAllTypesCreateTableOptions());
table = harness.getClient().openTable(TABLE_NAME);
+
+ KuduClient client = harness.getClient();
+ KuduSession session = client.newSession();
+
+ for (int i = 0; i < TEST_ROWS; i++) {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+
+ row.addByte(0, (byte) i);
+ row.addShort(1, (short) 2);
+ row.addInt(2, 3);
+ row.addLong(3, 4L);
+ row.addBoolean(4, true);
+ row.addFloat(5, 5.6f);
+ row.addDouble(6, 7.8);
+ row.addString(7, "string-value");
+ row.addBinary(8, "binary-array".getBytes(UTF_8));
+ ByteBuffer bb = ByteBuffer.wrap("binary-bytebuffer".getBytes(UTF_8));
+ bb.position(7); // We're only inserting the bytebuffer part of the
original array.
+ row.addBinary(9, bb);
+ row.setNull(10);
+ row.addTimestamp(11, new Timestamp(11));
+ row.addDecimal(12, BigDecimal.valueOf(12345, 3));
+ row.addVarchar(13, "varcharval");
+ row.addDate(14, DateUtil.epochDaysToSqlDate(0));
+
+ session.apply(insert);
+ }
}
@Test(timeout = 10000)
- public void test() throws Exception {
- Insert insert = table.newInsert();
- PartialRow row = insert.getRow();
-
- row.addByte(0, (byte) 1);
- row.addShort(1, (short) 2);
- row.addInt(2, 3);
- row.addLong(3, 4L);
- row.addBoolean(4, true);
- row.addFloat(5, 5.6f);
- row.addDouble(6, 7.8);
- row.addString(7, "string-value");
- row.addBinary(8, "binary-array".getBytes(UTF_8));
- ByteBuffer bb = ByteBuffer.wrap("binary-bytebuffer".getBytes(UTF_8));
- bb.position(7); // We're only inserting the bytebuffer part of the
original array.
- row.addBinary(9, bb);
- row.setNull(10);
- row.addTimestamp(11, new Timestamp(11));
- row.addDecimal(12, BigDecimal.valueOf(12345, 3));
- row.addVarchar(13, "varcharval");
- row.addDate(14, DateUtil.epochDaysToSqlDate(0));
+ public void testRowwiseRowset() throws Exception {
+ KuduClient client = harness.getClient();
+ KuduScanner scanner = client.newScannerBuilder(table).build();
+ checkRows(scanner);
+ }
+ @Test(timeout = 10000)
+ public void testRowwiseRowsetReuse() throws Exception {
KuduClient client = harness.getClient();
- KuduSession session = client.newSession();
- session.apply(insert);
+ KuduScanner scanner = client.newScannerBuilder(table).build();
+ scanner.setReuseRowResult(true);
+ checkRows(scanner);
+ }
+ @Test(timeout = 10000)
+ public void testColumnarRowset() throws Exception {
+ KuduClient client = harness.getClient();
KuduScanner scanner = client.newScannerBuilder(table).build();
+ scanner.setRowDataFormat(AsyncKuduScanner.RowDataFormat.COLUMNAR);
+ checkRows(scanner);
+ }
+
+ @Test(timeout = 10000)
+ public void testColumnarRowsetReuse() throws Exception {
+ KuduClient client = harness.getClient();
+ KuduScanner scanner = client.newScannerBuilder(table).build();
+ scanner.setRowDataFormat(AsyncKuduScanner.RowDataFormat.COLUMNAR);
+ scanner.setReuseRowResult(true);
+ checkRows(scanner);
+ }
+
+ private void checkRows(KuduScanner scanner) throws KuduException {
while (scanner.hasMoreRows()) {
RowResultIterator it = scanner.nextRows();
- assertTrue(it.hasNext());
- RowResult rr = it.next();
-
- assertEquals((byte) 1, rr.getByte(0));
- assertEquals((byte) 1, rr.getObject(0));
- assertEquals((byte) 1,
rr.getByte(allTypesSchema.getColumnByIndex(0).getName()));
-
- assertEquals((short) 2, rr.getShort(1));
- assertEquals((short) 2, rr.getObject(1));
- assertEquals((short) 2,
rr.getShort(allTypesSchema.getColumnByIndex(1).getName()));
-
- assertEquals(3, rr.getInt(2));
- assertEquals(3, rr.getObject(2));
- assertEquals(3, rr.getInt(allTypesSchema.getColumnByIndex(2).getName()));
-
- assertEquals((long) 4, rr.getLong(3));
- assertEquals((long) 4, rr.getObject(3));
- assertEquals((long) 4,
rr.getLong(allTypesSchema.getColumnByIndex(3).getName()));
-
- assertEquals(true, rr.getBoolean(4));
- assertEquals(true, rr.getObject(4));
- assertEquals(true,
rr.getBoolean(allTypesSchema.getColumnByIndex(4).getName()));
-
- assertEquals(5.6f, rr.getFloat(5), .001f);
- assertEquals(5.6f, (float) rr.getObject(5), .001f);
- assertEquals(5.6f,
- rr.getFloat(allTypesSchema.getColumnByIndex(5).getName()), .001f);
-
- assertEquals(7.8, rr.getDouble(6), .001);
- assertEquals(7.8, (double) rr.getObject(6), .001);
- assertEquals(7.8,
- rr.getDouble(allTypesSchema.getColumnByIndex(6).getName()), .001f);
-
- assertEquals("string-value", rr.getString(7));
- assertEquals("string-value", rr.getObject(7));
- assertEquals("string-value",
- rr.getString(allTypesSchema.getColumnByIndex(7).getName()));
-
- assertArrayEquals("binary-array".getBytes(UTF_8), rr.getBinaryCopy(8));
- assertArrayEquals("binary-array".getBytes(UTF_8), (byte[])
rr.getObject(8));
- assertArrayEquals("binary-array".getBytes(UTF_8),
- rr.getBinaryCopy(allTypesSchema.getColumnByIndex(8).getName()));
-
- ByteBuffer buffer = rr.getBinary(8);
- assertEquals(buffer,
rr.getBinary(allTypesSchema.getColumnByIndex(8).getName()));
- byte[] binaryValue = new byte[buffer.remaining()];
- buffer.get(binaryValue);
- assertArrayEquals("binary-array".getBytes(UTF_8), binaryValue);
-
- assertArrayEquals("bytebuffer".getBytes(UTF_8), rr.getBinaryCopy(9));
-
- assertEquals(true, rr.isNull(10));
- assertNull(rr.getObject(10));
- assertEquals(true,
rr.isNull(allTypesSchema.getColumnByIndex(10).getName()));
-
- assertEquals(new Timestamp(11), rr.getTimestamp(11));
- assertEquals(new Timestamp(11), rr.getObject(11));
- assertEquals(new Timestamp(11),
- rr.getTimestamp(allTypesSchema.getColumnByIndex(11).getName()));
-
- assertEquals(BigDecimal.valueOf(12345, 3), rr.getDecimal(12));
- assertEquals(BigDecimal.valueOf(12345, 3), rr.getObject(12));
- assertEquals(BigDecimal.valueOf(12345, 3),
- rr.getDecimal(allTypesSchema.getColumnByIndex(12).getName()));
-
- assertEquals("varcharval", rr.getVarchar(13));
- assertEquals("varcharval", rr.getObject(13));
- assertEquals("varcharval",
- rr.getVarchar(allTypesSchema.getColumnByIndex(13).getName()));
-
- assertEquals(DateUtil.epochDaysToSqlDate(0), rr.getDate(14));
- assertEquals(DateUtil.epochDaysToSqlDate(0), rr.getObject(14));
- assertEquals(DateUtil.epochDaysToSqlDate(0),
- rr.getDate(allTypesSchema.getColumnByIndex(14).getName()));
-
- // We test with the column name once since it's the same method for all
types, unlike above.
- assertEquals(Type.INT8,
rr.getColumnType(allTypesSchema.getColumnByIndex(0).getName()));
- assertEquals(Type.INT8, rr.getColumnType(0));
- assertEquals(Type.INT16, rr.getColumnType(1));
- assertEquals(Type.INT32, rr.getColumnType(2));
- assertEquals(Type.INT64, rr.getColumnType(3));
- assertEquals(Type.BOOL, rr.getColumnType(4));
- assertEquals(Type.FLOAT, rr.getColumnType(5));
- assertEquals(Type.DOUBLE, rr.getColumnType(6));
- assertEquals(Type.STRING, rr.getColumnType(7));
- assertEquals(Type.BINARY, rr.getColumnType(8));
- assertEquals(Type.UNIXTIME_MICROS, rr.getColumnType(11));
- assertEquals(Type.DECIMAL, rr.getColumnType(12));
- assertEquals(Type.VARCHAR, rr.getColumnType(13));
- assertEquals(Type.DATE, rr.getColumnType(14));
+ assertEquals(TEST_ROWS, it.getNumRows());
+ for (int i = 0; i < TEST_ROWS; i++) {
+ assertTrue(it.hasNext());
+ RowResult rr = it.next();
+
+ assertEquals((byte) i, rr.getByte(0));
+ assertEquals((byte) i, rr.getObject(0));
+ assertEquals((byte) i,
rr.getByte(allTypesSchema.getColumnByIndex(0).getName()));
+
+ assertEquals((short) 2, rr.getShort(1));
+ assertEquals((short) 2, rr.getObject(1));
+ assertEquals((short) 2,
rr.getShort(allTypesSchema.getColumnByIndex(1).getName()));
+
+ assertEquals(3, rr.getInt(2));
+ assertEquals(3, rr.getObject(2));
+ assertEquals(3,
rr.getInt(allTypesSchema.getColumnByIndex(2).getName()));
+
+ assertEquals((long) 4, rr.getLong(3));
+ assertEquals((long) 4, rr.getObject(3));
+ assertEquals((long) 4,
rr.getLong(allTypesSchema.getColumnByIndex(3).getName()));
+
+ assertEquals(true, rr.getBoolean(4));
+ assertEquals(true, rr.getObject(4));
+ assertEquals(true,
rr.getBoolean(allTypesSchema.getColumnByIndex(4).getName()));
+
+ assertEquals(5.6f, rr.getFloat(5), .001f);
+ assertEquals(5.6f, (float) rr.getObject(5), .001f);
+ assertEquals(5.6f,
+ rr.getFloat(allTypesSchema.getColumnByIndex(5).getName()),
.001f);
+
+ assertEquals(7.8, rr.getDouble(6), .001);
+ assertEquals(7.8, (double) rr.getObject(6), .001);
+ assertEquals(7.8,
+ rr.getDouble(allTypesSchema.getColumnByIndex(6).getName()),
.001f);
+
+ assertEquals("string-value", rr.getString(7));
+ assertEquals("string-value", rr.getObject(7));
+ assertEquals("string-value",
+ rr.getString(allTypesSchema.getColumnByIndex(7).getName()));
+
+ assertArrayEquals("binary-array".getBytes(UTF_8), rr.getBinaryCopy(8));
+ assertArrayEquals("binary-array".getBytes(UTF_8), (byte[])
rr.getObject(8));
+ assertArrayEquals("binary-array".getBytes(UTF_8),
+
rr.getBinaryCopy(allTypesSchema.getColumnByIndex(8).getName()));
+
+ ByteBuffer buffer = rr.getBinary(8);
+ assertEquals(buffer,
rr.getBinary(allTypesSchema.getColumnByIndex(8).getName()));
+ byte[] binaryValue = new byte[buffer.remaining()];
+ buffer.get(binaryValue);
+ assertArrayEquals("binary-array".getBytes(UTF_8), binaryValue);
+
+ assertArrayEquals("bytebuffer".getBytes(UTF_8), rr.getBinaryCopy(9));
+
+ assertEquals(true, rr.isNull(10));
+ assertNull(rr.getObject(10));
+ assertEquals(true,
rr.isNull(allTypesSchema.getColumnByIndex(10).getName()));
+
+ assertEquals(new Timestamp(11), rr.getTimestamp(11));
+ assertEquals(new Timestamp(11), rr.getObject(11));
+ assertEquals(new Timestamp(11),
+
rr.getTimestamp(allTypesSchema.getColumnByIndex(11).getName()));
+
+ assertEquals(BigDecimal.valueOf(12345, 3), rr.getDecimal(12));
+ assertEquals(BigDecimal.valueOf(12345, 3), rr.getObject(12));
+ assertEquals(BigDecimal.valueOf(12345, 3),
+ rr.getDecimal(allTypesSchema.getColumnByIndex(12).getName()));
+
+ assertEquals("varcharval", rr.getVarchar(13));
+ assertEquals("varcharval", rr.getObject(13));
+ assertEquals("varcharval",
+ rr.getVarchar(allTypesSchema.getColumnByIndex(13).getName()));
+
+ assertEquals(DateUtil.epochDaysToSqlDate(0), rr.getDate(14));
+ assertEquals(DateUtil.epochDaysToSqlDate(0), rr.getObject(14));
+ assertEquals(DateUtil.epochDaysToSqlDate(0),
+ rr.getDate(allTypesSchema.getColumnByIndex(14).getName()));
+
+ // We test with the column name once since it's the same method for
all types, unlike above.
+ assertEquals(Type.INT8,
rr.getColumnType(allTypesSchema.getColumnByIndex(0).getName()));
+ assertEquals(Type.INT8, rr.getColumnType(0));
+ assertEquals(Type.INT16, rr.getColumnType(1));
+ assertEquals(Type.INT32, rr.getColumnType(2));
+ assertEquals(Type.INT64, rr.getColumnType(3));
+ assertEquals(Type.BOOL, rr.getColumnType(4));
+ assertEquals(Type.FLOAT, rr.getColumnType(5));
+ assertEquals(Type.DOUBLE, rr.getColumnType(6));
+ assertEquals(Type.STRING, rr.getColumnType(7));
+ assertEquals(Type.BINARY, rr.getColumnType(8));
+ assertEquals(Type.UNIXTIME_MICROS, rr.getColumnType(11));
+ assertEquals(Type.DECIMAL, rr.getColumnType(12));
+ assertEquals(Type.VARCHAR, rr.getColumnType(13));
+ assertEquals(Type.DATE, rr.getColumnType(14));
+ }
}
}
}