This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a54e9d  DRILL-7877 - Streaming REST API Fails to Send Multiple 
Batches (#2189)
6a54e9d is described below

commit 6a54e9dd57b3b4cc8e77824856d6e884f2a6013b
Author: Charles S. Givre <[email protected]>
AuthorDate: Thu Mar 18 13:56:09 2021 -0400

    DRILL-7877 - Streaming REST API Fails to Send Multiple Batches (#2189)
    
    * DRILL-7877 - Streaming REST API Fails to Send Multiple Batches
    
    * Alternative fix
    
    * Add missing nullStateReader.bindIndex calls
    
    Co-authored-by: Volodymyr Vysotskyi <[email protected]>
---
 .../exec/physical/resultSet/model/ReaderIndex.java | 16 +++++-----
 .../resultSet/model/single/DirectRowIndex.java     |  2 +-
 .../drill/exec/physical/rowSet/HyperRowIndex.java  |  2 +-
 .../exec/physical/rowSet/IndirectRowIndex.java     |  2 +-
 .../drill/exec/server/rest/TestRestJson.java       | 10 ++++++
 exec/java-exec/src/test/resources/rest/group.json  | 26 ++++++++++++++++
 .../accessor/reader/AbstractTupleReader.java       |  2 ++
 .../vector/accessor/reader/ArrayReaderImpl.java    |  1 +
 .../vector/accessor/reader/BaseScalarReader.java   |  1 +
 .../vector/accessor/reader/NullStateReader.java    |  1 +
 .../vector/accessor/reader/NullStateReaders.java   | 36 ++++++++++++++++++++--
 .../vector/accessor/reader/UnionReaderImpl.java    | 10 +++---
 12 files changed, 93 insertions(+), 16 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/ReaderIndex.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/ReaderIndex.java
index cdf3ec9..d36f074 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/ReaderIndex.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/ReaderIndex.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.physical.resultSet.model;
 
 import org.apache.drill.exec.vector.accessor.ColumnReaderIndex;
 
+import java.util.function.Supplier;
+
 /**
  * Row set index base class used when indexing rows within a row
  * set for a row set reader. Keeps track of the current position,
@@ -29,14 +31,14 @@ import 
org.apache.drill.exec.vector.accessor.ColumnReaderIndex;
 public abstract class ReaderIndex implements ColumnReaderIndex {
 
   protected int position = -1;
-  protected final int rowCount;
+  protected final Supplier<Integer> rowCount;
 
-  public ReaderIndex(int rowCount) {
+  public ReaderIndex(Supplier<Integer> rowCount) {
     this.rowCount = rowCount;
   }
 
   public void set(int index) {
-    assert position >= -1 && position <= rowCount;
+    assert position >= -1 && position <= rowCount.get();
     position = index;
   }
 
@@ -44,19 +46,19 @@ public abstract class ReaderIndex implements 
ColumnReaderIndex {
   public int logicalIndex() { return position; }
 
   @Override
-  public int size() { return rowCount; }
+  public int size() { return rowCount.get(); }
 
   @Override
   public boolean next() {
-    if (++position < rowCount) {
+    if (++position < rowCount.get()) {
       return true;
     }
-    position = rowCount;
+    position = rowCount.get();
     return false;
   }
 
   @Override
   public boolean hasNext() {
-    return position + 1 < rowCount;
+    return position + 1 < rowCount.get();
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java
index 8fada11..768c9d3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/resultSet/model/single/DirectRowIndex.java
@@ -30,7 +30,7 @@ import org.apache.drill.exec.record.VectorContainer;
 public class DirectRowIndex extends ReaderIndex {
 
   public DirectRowIndex(VectorContainer container) {
-    super(container.getRecordCount());
+    super(container::getRecordCount);
   }
 
   @Override
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/HyperRowIndex.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/HyperRowIndex.java
index 1e1cd5b..5551ee1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/HyperRowIndex.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/HyperRowIndex.java
@@ -31,7 +31,7 @@ public class HyperRowIndex extends ReaderIndex {
   private final SelectionVector4 sv4;
 
   public HyperRowIndex(SelectionVector4 sv4) {
-    super(sv4.getCount());
+    super(sv4::getCount);
     this.sv4 = sv4;
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/IndirectRowIndex.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/IndirectRowIndex.java
index 8aa0770..16880bb 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/IndirectRowIndex.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/rowSet/IndirectRowIndex.java
@@ -34,7 +34,7 @@ public class IndirectRowIndex extends ReaderIndex {
   private final SelectionVector2 sv2;
 
   public IndirectRowIndex(SelectionVector2 sv2) {
-    super(sv2.getCount());
+    super(sv2::getCount);
     this.sv2 = sv2;
   }
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
index 2c55459..a204fc3 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/server/rest/TestRestJson.java
@@ -118,6 +118,16 @@ public class TestRestJson extends ClusterTest {
   }
 
   @Test
+  public void testGroupby() throws IOException {
+    File outFile = new File(dirTestWatcher.getTmpDir(), "group.json");
+    String sql = "SELECT position_title, COUNT(*) as pc FROM 
cp.`employee.json` GROUP BY position_title";
+    QueryWrapper query = new QueryWrapper(sql, QueryType.SQL.name(),
+      null, null, null, null);
+    runQuery(query, outFile);
+    verifier.verifyFileWithResource(outFile, "group.json");
+  }
+
+  @Test
   public void testNoLimit() throws IOException {
     File outFile = new File(dirTestWatcher.getTmpDir(), "cust20.json");
     String sql = "SELECT * FROM cp.`employee.json` LIMIT 20";
diff --git a/exec/java-exec/src/test/resources/rest/group.json 
b/exec/java-exec/src/test/resources/rest/group.json
new file mode 100644
index 0000000..af3f171
--- /dev/null
+++ b/exec/java-exec/src/test/resources/rest/group.json
@@ -0,0 +1,26 @@
+!\{"queryId":"[^"]+"
+,"columns":["position_title","pc"]
+,"metadata":["VARCHAR","BIGINT"]
+,"attemptedAutoLimit":0
+,"rows":[
+{"position_title":"President","pc":1}
+,{"position_title":"VP Country Manager","pc":6}
+,{"position_title":"VP Information Systems","pc":1}
+,{"position_title":"VP Human Resources","pc":1}
+,{"position_title":"Store Manager","pc":24}
+,{"position_title":"VP Finance","pc":1}
+,{"position_title":"HQ Marketing","pc":3}
+,{"position_title":"HQ Information Systems","pc":4}
+,{"position_title":"HQ Human Resources","pc":2}
+,{"position_title":"HQ Finance and Accounting","pc":8}
+,{"position_title":"Store Assistant Manager","pc":24}
+,{"position_title":"Store Shift Supervisor","pc":52}
+,{"position_title":"Store Permanent Butcher","pc":32}
+,{"position_title":"Store Information Systems","pc":16}
+,{"position_title":"Store Permanent Checker","pc":226}
+,{"position_title":"Store Temporary Checker","pc":268}
+,{"position_title":"Store Permanent Stocker","pc":222}
+,{"position_title":"Store Temporary Stocker","pc":264}
+]
+,"queryState":"COMPLETED"
+}
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
index 0c699cd..0a2a0f0 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/AbstractTupleReader.java
@@ -82,6 +82,7 @@ public abstract class AbstractTupleReader implements 
TupleReader, ReaderEvents {
     for (AbstractObjectReader reader : readers) {
       reader.events().bindIndex(index);
     }
+    nullStateReader.bindIndex(index);
   }
 
   @Override
@@ -94,6 +95,7 @@ public abstract class AbstractTupleReader implements 
TupleReader, ReaderEvents {
     for (AbstractObjectReader reader : readers) {
       reader.events().bindBuffer();
     }
+    nullStateReader.bindBuffer();
   }
 
   @Override
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
index 474b8d7..1313a32 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/ArrayReaderImpl.java
@@ -373,6 +373,7 @@ public class ArrayReaderImpl implements ArrayReader, 
ReaderEvents {
   @Override
   public void bindBuffer() {
     elementReader.events().bindBuffer();
+    nullStateReader.bindBuffer();
   }
 
   @Override
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java
index 9770452..6e76173 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/BaseScalarReader.java
@@ -169,6 +169,7 @@ public abstract class BaseScalarReader extends 
AbstractScalarReader {
   @Override
   public void bindBuffer() {
     bufferAccessor.rebind();
+    nullStateReader.bindBuffer();
   }
 
   public final DrillBuf buffer() {
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReader.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReader.java
index e52c0f5..3715087 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReader.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReader.java
@@ -49,4 +49,5 @@ import 
org.apache.drill.exec.vector.accessor.ColumnReaderIndex;
 public interface NullStateReader {
   void bindIndex(ColumnReaderIndex rowIndex);
   boolean isNull();
+  void bindBuffer();
 }
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java
index 0385091..c4d9152 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/NullStateReaders.java
@@ -42,6 +42,10 @@ public class NullStateReaders {
 
     @Override
     public boolean isNull() { return false; }
+
+    @Override
+    public void bindBuffer() {
+    }
   }
 
   /**
@@ -76,6 +80,11 @@ public class NullStateReaders {
     public boolean isNull() {
       return isSetReader.getInt() == 0;
     }
+
+    @Override
+    public void bindBuffer() {
+      isSetReader.bindBuffer();
+    }
   }
 
   /**
@@ -109,6 +118,11 @@ public class NullStateReaders {
     public boolean isNull() {
       return isSetReader.getInt() == 0;
     }
+
+    @Override
+    public void bindBuffer() {
+      isSetReader.bindBuffer();
+    }
   }
 
   /**
@@ -138,6 +152,11 @@ public class NullStateReaders {
     public boolean isNull() {
       return unionNullState.isNull() || memberNullState.isNull();
     }
+
+    @Override
+    public void bindBuffer() {
+      memberNullState.bindBuffer();
+    }
   }
 
   /**
@@ -150,8 +169,8 @@ public class NullStateReaders {
 
   protected static class ComplexMemberStateReader implements NullStateReader {
 
-    private UInt1ColumnReader typeReader;
-    private MinorType type;
+    private final UInt1ColumnReader typeReader;
+    private final MinorType type;
 
     public ComplexMemberStateReader(UInt1ColumnReader typeReader, MinorType 
type) {
       this.typeReader = typeReader;
@@ -165,6 +184,10 @@ public class NullStateReaders {
     public boolean isNull() {
       return typeReader.getInt() != type.getNumber();
     }
+
+    @Override
+    public void bindBuffer() {
+    }
   }
 
   /**
@@ -189,6 +212,11 @@ public class NullStateReaders {
     public boolean isNull() {
       return typeReader.getInt() == UnionVector.NULL_MARKER;
     }
+
+    @Override
+    public void bindBuffer() {
+      typeReader.bindBuffer();
+    }
   }
 
   protected static class AlwaysNullStateReader implements NullStateReader {
@@ -201,5 +229,9 @@ public class NullStateReaders {
     public boolean isNull() {
       return true;
     }
+
+    @Override
+    public void bindBuffer() {
+    }
   }
 }
diff --git 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
index a1720ba..28a184e 100644
--- 
a/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
+++ 
b/exec/vector/src/main/java/org/apache/drill/exec/vector/accessor/reader/UnionReaderImpl.java
@@ -129,7 +129,7 @@ public class UnionReaderImpl implements VariantReader, 
ReaderEvents {
     }
   }
 
-  public static AbstractObjectReader build(ColumnMetadata schema, 
VectorAccessor va, AbstractObjectReader variants[]) {
+  public static AbstractObjectReader build(ColumnMetadata schema, 
VectorAccessor va, AbstractObjectReader[] variants) {
     return new UnionObjectReader(
         new UnionReaderImpl(schema, va, variants));
   }
@@ -145,9 +145,10 @@ public class UnionReaderImpl implements VariantReader, 
ReaderEvents {
     unionAccessor.bind(index);
     typeAccessor.bind(index);
     typeReader.bindIndex(index);
-    for (int i = 0; i < variants.length; i++) {
-      if (variants[i] != null) {
-        variants[i].events().bindIndex(index);
+    nullStateReader.bindIndex(index);
+    for (AbstractObjectReader variant : variants) {
+      if (variant != null) {
+        variant.events().bindIndex(index);
       }
     }
   }
@@ -190,6 +191,7 @@ public class UnionReaderImpl implements VariantReader, 
ReaderEvents {
         variantReader.events().bindBuffer();
       }
     }
+    nullStateReader.bindBuffer();
   }
 
   @Override

Reply via email to