[CALCITE-1429] Druid adapter must send "fromNext" when requesting rows from 
Druid (Jiarong Wei)

Close apache/calcite#303


Project: http://git-wip-us.apache.org/repos/asf/calcite/repo
Commit: http://git-wip-us.apache.org/repos/asf/calcite/commit/97ccd6de
Tree: http://git-wip-us.apache.org/repos/asf/calcite/tree/97ccd6de
Diff: http://git-wip-us.apache.org/repos/asf/calcite/diff/97ccd6de

Branch: refs/heads/master
Commit: 97ccd6ded2b9910a551d12f610100371aad6c6a8
Parents: aad03de
Author: Jiarong Wei <vca...@gmail.com>
Authored: Tue Oct 11 22:53:55 2016 +0800
Committer: Julian Hyde <jh...@apache.org>
Committed: Tue Oct 18 16:38:05 2016 -0700

----------------------------------------------------------------------
 .../calcite/adapter/druid/DruidConnectionImpl.java   |  3 +++
 .../org/apache/calcite/adapter/druid/DruidQuery.java |  5 ++---
 .../java/org/apache/calcite/test/DruidAdapterIT.java | 15 +++++++--------
 3 files changed, 12 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/calcite/blob/97ccd6de/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
index 839d2c9..872b6e9 100644
--- 
a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
+++ 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidConnectionImpl.java
@@ -179,6 +179,7 @@ class DruidConnectionImpl implements DruidConnection {
             && parser.nextToken() == JsonToken.START_OBJECT) {
           page.pagingIdentifier = null;
           page.offset = -1;
+          page.totalRowCount = 0;
           expectScalarField(parser, DEFAULT_RESPONSE_TIMESTAMP_COLUMN);
           if (parser.nextToken() == JsonToken.FIELD_NAME
               && parser.getCurrentName().equals("result")
@@ -208,6 +209,7 @@ class DruidConnectionImpl implements DruidConnection {
                   parseFields(fieldNames, fieldTypes, posTimestampField, 
rowBuilder, parser);
                   sink.send(rowBuilder.build());
                   rowBuilder.reset();
+                  page.totalRowCount += 1;
                 }
                 expect(parser, JsonToken.END_OBJECT);
               }
@@ -569,6 +571,7 @@ class DruidConnectionImpl implements DruidConnection {
   static class Page {
     String pagingIdentifier = null;
     int offset = -1;
+    int totalRowCount = 0;
 
     @Override public String toString() {
       return "{" + pagingIdentifier + ": " + offset + "}";

http://git-wip-us.apache.org/repos/asf/calcite/blob/97ccd6de/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
----------------------------------------------------------------------
diff --git 
a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java 
b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
index b5d6e30..94a1bab 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidQuery.java
@@ -628,6 +628,7 @@ public class DruidQuery extends AbstractRelNode implements 
BindableRel {
         generator.writeStartObject();
         generator.writeNumberField("threshold", fetch != null ? fetch
             : CalciteConnectionProperty.DRUID_FETCH.wrap(new 
Properties()).getInt());
+        generator.writeBooleanField("fromNext", true);
         generator.writeEndObject();
 
         generator.writeFieldName("context");
@@ -928,16 +929,14 @@ public class DruidQuery extends AbstractRelNode 
implements BindableRel {
               query.druidTable.schema.coordinatorUrl);
       final boolean limitQuery = containsLimit(querySpec);
       final DruidConnectionImpl.Page page = new DruidConnectionImpl.Page();
-      int previousOffset;
       do {
-        previousOffset = page.offset;
         final String queryString =
             querySpec.getQueryString(page.pagingIdentifier, page.offset);
         connection.request(querySpec.queryType, queryString, sink,
             querySpec.fieldNames, fieldTypes, page);
       } while (!limitQuery
           && page.pagingIdentifier != null
-          && page.offset > previousOffset);
+          && page.totalRowCount > 0);
     }
 
     private static boolean containsLimit(QuerySpec querySpec) {

http://git-wip-us.apache.org/repos/asf/calcite/blob/97ccd6de/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
----------------------------------------------------------------------
diff --git a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java 
b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
index 5de85e3..3a52814 100644
--- a/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
+++ b/druid/src/test/java/org/apache/calcite/test/DruidAdapterIT.java
@@ -202,7 +202,7 @@ public class DruidAdapterIT {
     final String druidQuery = "{'queryType':'select',"
         + "'dataSource':'wikiticker','descending':false,"
         + "'intervals':['1900-01-01T00:00:00.000Z/3000-01-01T00:00:00.000Z'],"
-        + 
"'dimensions':[],'metrics':[],'granularity':'all','pagingSpec':{'threshold':1},"
+        + 
"'dimensions':[],'metrics':[],'granularity':'all','pagingSpec':{'threshold':1,'fromNext':true},"
         + "'context':{'druid.query.fetch':true}}";
     sql(sql, WIKI_AUTO2)
         .returnsUnordered("__time=2015-09-12 00:46:58")
@@ -397,7 +397,7 @@ public class DruidAdapterIT {
     final String druidQuery = "{'queryType':'select','dataSource':'foodmart',"
         + 
"'descending':false,'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
         + 
"'dimensions':['state_province','product_name'],'metrics':[],'granularity':'all',"
-        + 
"'pagingSpec':{'threshold':16384},'context':{'druid.query.fetch':false}}";
+        + 
"'pagingSpec':{'threshold':16384,'fromNext':true},'context':{'druid.query.fetch':false}}";
     sql(sql)
         .runs()
         .queryContains(druidChecker(druidQuery));
@@ -409,7 +409,7 @@ public class DruidAdapterIT {
     final String druidQuery = "{'queryType':'select','dataSource':'foodmart',"
         + 
"'descending':false,'intervals':['1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z'],"
         + 
"'dimensions':['gender','state_province'],'metrics':[],'granularity':'all',"
-        + "'pagingSpec':{'threshold':3},'context':{'druid.query.fetch':true}}";
+        + 
"'pagingSpec':{'threshold':3,'fromNext':true},'context':{'druid.query.fetch':true}}";
     sql(sql)
         .runs()
         .queryContains(druidChecker(druidQuery));
@@ -458,7 +458,7 @@ public class DruidAdapterIT {
         + 
"'meat_sqft','coffee_bar','video_store','salad_bar','prepared_food','florist','time_id','the_day',"
         + 
"'the_month','the_year','day_of_month','week_of_year','month_of_year','quarter','fiscal_period'],"
         + 
"'metrics':['unit_sales','store_sales','store_cost'],'granularity':'all',"
-        + 
"'pagingSpec':{'threshold':16384},'context':{'druid.query.fetch':false}}";
+        + 
"'pagingSpec':{'threshold':16384,'fromNext':true},'context':{'druid.query.fetch':false}}";
     sql(sql)
         .limit(4)
         .returns(
@@ -506,7 +506,7 @@ public class DruidAdapterIT {
         + 
"'florist','time_id','the_day','the_month','the_year','day_of_month',"
         + "'week_of_year','month_of_year','quarter','fiscal_period'],"
         + 
"'metrics':['unit_sales','store_sales','store_cost'],'granularity':'all',"
-        + 
"'pagingSpec':{'threshold':16384},'context':{'druid.query.fetch':false}}";
+        + 
"'pagingSpec':{'threshold':16384,'fromNext':true},'context':{'druid.query.fetch':false}}";
     sql(sql)
         .limit(4)
         .returnsUnordered()
@@ -536,7 +536,7 @@ public class DruidAdapterIT {
         + 
"'meat_sqft','coffee_bar','video_store','salad_bar','prepared_food','florist','time_id','the_day',"
         + 
"'the_month','the_year','day_of_month','week_of_year','month_of_year','quarter','fiscal_period'],"
         + 
"'metrics':['unit_sales','store_sales','store_cost'],'granularity':'all',"
-        + 
"'pagingSpec':{'threshold':16384},'context':{'druid.query.fetch':false}}";
+        + 
"'pagingSpec':{'threshold':16384,'fromNext':true},'context':{'druid.query.fetch':false}}";
     sql(sql)
         .limit(4)
         .returns(
@@ -913,7 +913,7 @@ public class DruidAdapterIT {
         + "'dimensions':['state_province','city','product_name'],"
         + "'metrics':[],"
         + "'granularity':'all',"
-        + 
"'pagingSpec':{'threshold':16384},'context':{'druid.query.fetch':false}}";
+        + 
"'pagingSpec':{'threshold':16384,'fromNext':true},'context':{'druid.query.fetch':false}}";
     final String explain = "PLAN=EnumerableInterpreter\n"
         + "  DruidQuery(table=[[foodmart, foodmart]], 
intervals=[[1900-01-09T00:00:00.000Z/2992-01-10T00:00:00.000Z]],"
         + " filter=[AND(=(CAST($3):VARCHAR(24) CHARACTER SET \"ISO-8859-1\" 
COLLATE \"ISO-8859-1$en_US$primary\", 'High Top Dried Mushrooms'),"
@@ -926,7 +926,6 @@ public class DruidAdapterIT {
         .explainContains(explain)
         .returnsUnordered(
             "state_province=WA; city=Bremerton; product_name=High Top Dried 
Mushrooms",
-            "state_province=WA; city=Bremerton; product_name=High Top Dried 
Mushrooms",
             "state_province=WA; city=Everett; product_name=High Top Dried 
Mushrooms",
             "state_province=WA; city=Kirkland; product_name=High Top Dried 
Mushrooms",
             "state_province=WA; city=Lynnwood; product_name=High Top Dried 
Mushrooms",

Reply via email to