This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 6a54e9d DRILL-7877 - Streaming REST API Fails to Send Multiple
Batches (#2189)
6a54e9d is described below
commit 6a54e9dd57b3b4cc8e77824856d6e884f2a6013b
Author: Charles S. Givre <[email protected]>
AuthorDate: Thu Mar 18 13:56:09 2021 -0400
DRILL-7877 - Streaming REST API Fails to Send Multiple Batches (#2189)
* DRILL-7877 - Streaming REST API Fails to Send Multiple Batches
* Alternative fix
* Add missing nullStateReader.bindIndex calls
Co-authored-by: Volodymyr Vysotskyi <[email protected]>
---
.../exec/physical/resultSet/model/ReaderIndex.java | 16 +++++-----
.../resultSet/model/single/DirectRowIndex.java | 2 +-
.../drill/exec/physical/rowSet/HyperRowIndex.java | 2 +-
.../exec/physical/rowSet/IndirectRowIndex.java | 2 +-
.../drill/exec/server/rest/TestRestJson.java | 10 ++++++
exec/java-exec/src/test/resources/rest/group.json | 26 ++++++++++++++++
.../accessor/reader/AbstractTupleReader.java | 2 ++
.../vector/accessor/reader/ArrayReaderImpl.java | 1 +
.../vector/accessor/reader/BaseScalarReader.java | 1 +
.../vector/accessor/reader/NullStateReader.java | 1 +
.../vector/accessor/reader/NullStateReaders.java | 36 ++++++++++++++++++++--
.../vector/accessor/reader/UnionReaderImpl.java | 10 +++---
12 files changed, 93 insertions(+), 16 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/ReaderIndex.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/ReaderIndex.java
index cdf3ec9..d36f074 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/ReaderIndex.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/ReaderIndex.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.physical.resultSet.model;
import org.apache.drill.exec.vector.accessor.ColumnReaderIndex;
+import java.util.function.Supplier;
+
/**
* Row set index base class used when indexing rows within a row
* set for a row set reader. Keeps track of the current position,
@@ -29,14 +31,14 @@ import
org.apache.drill.exec.vector.accessor.ColumnReaderIndex;
public abstract class ReaderIndex implements ColumnReaderIndex {
protected int position = -1;
- protected final int rowCount;
+ protected final Supplier<Integer> rowCount;
- public ReaderIndex(int rowCount) {
+ public ReaderIndex(Supplier<Integer> rowCount) {
this.rowCount = rowCount;
}
public void set(int index) {
- assert position >= -1 && position <= rowCount;
+ assert position >= -1 && position <= rowCount.get();
position = index;
}
@@ -44,19 +46,19 @@ public abstract class ReaderIndex implements
ColumnReaderIndex {
public int logicalIndex() { return position; }
@Override
- public int size() { return rowCount; }
+ public int size() { return rowCount.get(); }
@Override
public boolean next() {
- if (++position < rowCount) {
+ if (++position < rowCount.get()) {
return true;
}
- position = rowCount;
+ position = rowCount.get();
return false;
}
@Override
public boolean hasNext() {
- return position + 1 < rowCount;
+ return position + 1 < rowCount.get();
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java
index 8fada11..768c9d3 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java
@@ -30,7 +30,7 @@ import org.apache.drill.exec.record.VectorContainer;
public class DirectRowIndex extends ReaderIndex {
public DirectRowIndex(VectorContainer container) {
- super(container.getRecordCount());
+ super(container::getRecordCount);
}
@Override
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/HyperRowIndex.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/HyperRowIndex.java
index 1e1cd5b..5551ee1 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/HyperRowIndex.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/HyperRowIndex.java
@@ -31,7 +31,7 @@ public class HyperRowIndex extends ReaderIndex {
private final SelectionVector4 sv4;
public HyperRowIndex(SelectionVector4 sv4) {
- super(sv4.getCount());
+ super(sv4::getCount);
this.sv4 = sv4;
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/IndirectRowIndex.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/IndirectRowIndex.java
index 8aa0770..16880bb 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/IndirectRowIndex.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/IndirectRowIndex.java
@@ -34,7 +34,7 @@ public class IndirectRowIndex extends ReaderIndex {
private final SelectionVector2 sv2;
public IndirectRowIndex(SelectionVector2 sv2) {
- super(sv2.getCount());
+ super(sv2::getCount);
this.sv2 = sv2;
}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
index 2c55459..a204fc3 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
@@ -118,6 +118,16 @@ public class TestRestJson extends ClusterTest {
}
@Test
+ public void testGroupby() throws IOException {
+ File outFile = new File(dirTestWatcher.getTmpDir(), "group.json");
+ String sql = "SELECT position_title, COUNT(*) as pc FROM
cp.`employee.json` GROUP BY position_title";
+ QueryWrapper query = new QueryWrapper(sql, QueryType.SQL.name(),
+ null, null, null, null);
+ runQuery(query, outFile);
+ verifier.verifyFileWithResource(outFile, "group.json");
+ }
+
+ @Test
public void testNoLimit() throws IOException {
File outFile = new File(dirTestWatcher.getTmpDir(), "cust20.json");
String sql = "SELECT * FROM cp.`employee.json` LIMIT 20";
diff --git a/exec/java-exec/src/test/resources/rest/group.json
b/exec/java-exec/src/test/resources/rest/group.json
new file mode 100644
index 0000000..af3f171
--- /dev/null
+++ b/exec/java-exec/src/test/resources/rest/group.json
@@ -0,0 +1,26 @@
+!\{"queryId":"[^"]+"
+,"columns":["position_title","pc"]
+,"metadata":["VARCHAR","BIGINT"]
+,"attemptedAutoLimit":0
+,"rows":[
+{"position_title":"President","pc":1}
+,{"position_title":"VP Country Manager","pc":6}
+,{"position_title":"VP Information Systems","pc":1}
+,{"position_title":"VP Human Resources","pc":1}
+,{"position_title":"Store Manager","pc":24}
+,{"position_title":"VP Finance","pc":1}
+,{"position_title":"HQ Marketing","pc":3}
+,{"position_title":"HQ Information Systems","pc":4}
+,{"position_title":"HQ Human Resources","pc":2}
+,{"position_title":"HQ Finance and Accounting","pc":8}
+,{"position_title":"Store Assistant Manager","pc":24}
+,{"position_title":"Store Shift Supervisor","pc":52}
+,{"position_title":"Store Permanent Butcher","pc":32}
+,{"position_title":"Store Information Systems","pc":16}
+,{"position_title":"Store Permanent Checker","pc":226}
+,{"position_title":"Store Temporary Checker","pc":268}
+,{"position_title":"Store Permanent Stocker","pc":222}
+,{"position_title":"Store Temporary Stocker","pc":264}
+]
+,"queryState":"COMPLETED"
+}
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
index 0c699cd..0a2a0f0 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
@@ -82,6 +82,7 @@ public abstract class AbstractTupleReader implements
TupleReader, ReaderEvents {
for (AbstractObjectReader reader : readers) {
reader.events().bindIndex(index);
}
+ nullStateReader.bindIndex(index);
}
@Override
@@ -94,6 +95,7 @@ public abstract class AbstractTupleReader implements
TupleReader, ReaderEvents {
for (AbstractObjectReader reader : readers) {
reader.events().bindBuffer();
}
+ nullStateReader.bindBuffer();
}
@Override
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
index 474b8d7..1313a32 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
@@ -373,6 +373,7 @@ public class ArrayReaderImpl implements ArrayReader,
ReaderEvents {
@Override
public void bindBuffer() {
elementReader.events().bindBuffer();
+ nullStateReader.bindBuffer();
}
@Override
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java
index 9770452..6e76173 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java
@@ -169,6 +169,7 @@ public abstract class BaseScalarReader extends
AbstractScalarReader {
@Override
public void bindBuffer() {
bufferAccessor.rebind();
+ nullStateReader.bindBuffer();
}
public final DrillBuf buffer() {
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReader.java
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReader.java
index e52c0f5..3715087 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReader.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReader.java
@@ -49,4 +49,5 @@ import
org.apache.drill.exec.vector.accessor.ColumnReaderIndex;
public interface NullStateReader {
void bindIndex(ColumnReaderIndex rowIndex);
boolean isNull();
+ void bindBuffer();
}
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java
index 0385091..c4d9152 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java
@@ -42,6 +42,10 @@ public class NullStateReaders {
@Override
public boolean isNull() { return false; }
+
+ @Override
+ public void bindBuffer() {
+ }
}
/**
@@ -76,6 +80,11 @@ public class NullStateReaders {
public boolean isNull() {
return isSetReader.getInt() == 0;
}
+
+ @Override
+ public void bindBuffer() {
+ isSetReader.bindBuffer();
+ }
}
/**
@@ -109,6 +118,11 @@ public class NullStateReaders {
public boolean isNull() {
return isSetReader.getInt() == 0;
}
+
+ @Override
+ public void bindBuffer() {
+ isSetReader.bindBuffer();
+ }
}
/**
@@ -138,6 +152,11 @@ public class NullStateReaders {
public boolean isNull() {
return unionNullState.isNull() || memberNullState.isNull();
}
+
+ @Override
+ public void bindBuffer() {
+ memberNullState.bindBuffer();
+ }
}
/**
@@ -150,8 +169,8 @@ public class NullStateReaders {
protected static class ComplexMemberStateReader implements NullStateReader {
- private UInt1ColumnReader typeReader;
- private MinorType type;
+ private final UInt1ColumnReader typeReader;
+ private final MinorType type;
public ComplexMemberStateReader(UInt1ColumnReader typeReader, MinorType
type) {
this.typeReader = typeReader;
@@ -165,6 +184,10 @@ public class NullStateReaders {
public boolean isNull() {
return typeReader.getInt() != type.getNumber();
}
+
+ @Override
+ public void bindBuffer() {
+ }
}
/**
@@ -189,6 +212,11 @@ public class NullStateReaders {
public boolean isNull() {
return typeReader.getInt() == UnionVector.NULL_MARKER;
}
+
+ @Override
+ public void bindBuffer() {
+ typeReader.bindBuffer();
+ }
}
protected static class AlwaysNullStateReader implements NullStateReader {
@@ -201,5 +229,9 @@ public class NullStateReaders {
public boolean isNull() {
return true;
}
+
+ @Override
+ public void bindBuffer() {
+ }
}
}
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
index a1720ba..28a184e 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
@@ -129,7 +129,7 @@ public class UnionReaderImpl implements VariantReader,
ReaderEvents {
}
}
- public static AbstractObjectReader build(ColumnMetadata schema,
VectorAccessor va, AbstractObjectReader variants[]) {
+ public static AbstractObjectReader build(ColumnMetadata schema,
VectorAccessor va, AbstractObjectReader[] variants) {
return new UnionObjectReader(
new UnionReaderImpl(schema, va, variants));
}
@@ -145,9 +145,10 @@ public class UnionReaderImpl implements VariantReader,
ReaderEvents {
unionAccessor.bind(index);
typeAccessor.bind(index);
typeReader.bindIndex(index);
- for (int i = 0; i < variants.length; i++) {
- if (variants[i] != null) {
- variants[i].events().bindIndex(index);
+ nullStateReader.bindIndex(index);
+ for (AbstractObjectReader variant : variants) {
+ if (variant != null) {
+ variant.events().bindIndex(index);
}
}
}
@@ -190,6 +191,7 @@ public class UnionReaderImpl implements VariantReader,
ReaderEvents {
variantReader.events().bindBuffer();
}
}
+ nullStateReader.bindBuffer();
}
@Override