This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new b2c2d96  DRILL-7799: REST Queries Fail if Caching is Enabled
b2c2d96 is described below

commit b2c2d96375ae8c461e66269092af4be50013d074
Author: Charles Givre <[email protected]>
AuthorDate: Thu Oct 29 08:53:37 2020 -0400

    DRILL-7799: REST Queries Fail if Caching is Enabled
---
 .../drill/exec/store/http/HttpBatchReader.java     |  9 +++++
 .../drill/exec/store/http/HttpCSVBatchReader.java  |  6 +++
 .../drill/exec/store/http/HttpGroupScan.java       | 45 +++++++++++++++++++++-
 .../apache/drill/exec/store/http/HttpSubScan.java  | 17 ++++++--
 .../drill/exec/store/http/util/SimpleHttp.java     | 18 +++++----
 .../drill/exec/store/http/TestHttpPlugin.java      | 11 ++++++
 6 files changed, 94 insertions(+), 12 deletions(-)

diff --git 
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
 
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index 1b322dd..83058e2 100644
--- 
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++ 
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -40,10 +40,13 @@ import java.util.Map;
 
 public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
   private final HttpSubScan subScan;
+  private final int maxRecords;
   private JsonLoader jsonLoader;
+  private int recordCount;
 
   public HttpBatchReader(HttpSubScan subScan) {
     this.subScan = subScan;
+    this.maxRecords = subScan.maxRecords();
   }
 
   @Override
@@ -141,6 +144,12 @@ public class HttpBatchReader implements 
ManagedReader<SchemaNegotiator> {
 
   @Override
   public boolean next() {
+    recordCount++;
+
+    // Stop after the limit has been reached
+    if (maxRecords >= 1 && recordCount > maxRecords) {
+      return false;
+    }
     return jsonLoader.readBatch();
   }
 
diff --git 
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
 
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
index f7db13b..831f6bd 100644
--- 
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
+++ 
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
@@ -44,6 +44,7 @@ import java.util.List;
 public class HttpCSVBatchReader extends HttpBatchReader {
   private final HttpSubScan subScan;
   private final CsvParserSettings csvSettings;
+  private final int maxRecords;
   private CsvParser csvReader;
   private List<StringColumnWriter> columnWriters;
   private String[] firstRow;
@@ -55,6 +56,7 @@ public class HttpCSVBatchReader extends HttpBatchReader {
   public HttpCSVBatchReader(HttpSubScan subScan) {
     super(subScan);
     this.subScan = subScan;
+    this.maxRecords = subScan.maxRecords();
 
     this.csvSettings = new CsvParserSettings();
     csvSettings.setLineSeparatorDetectionEnabled(true);
@@ -102,6 +104,10 @@ public class HttpCSVBatchReader extends HttpBatchReader {
   @Override
   public boolean next() {
     while (!rowWriter.isFull()) {
+      if (rowWriter.limitReached(maxRecords)) {
+        return false;
+      }
+
       if (!processRow()) {
         return false;
       }
diff --git 
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
 
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
index ff0f7d6..4f2d9e1 100644
--- 
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
+++ 
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
@@ -47,6 +47,7 @@ public class HttpGroupScan extends AbstractGroupScan {
   private final Map<String, String> filters;
   private final ScanStats scanStats;
   private final double filterSelectivity;
+  private final int maxRecords;
 
   // Used only in planner, not serialized
   private int hashCode;
@@ -61,6 +62,7 @@ public class HttpGroupScan extends AbstractGroupScan {
     this.filters = null;
     this.filterSelectivity = 0.0;
     this.scanStats = computeScanStats();
+    this.maxRecords = -1;
   }
 
   /**
@@ -72,6 +74,7 @@ public class HttpGroupScan extends AbstractGroupScan {
     this.columns = that.columns;
     this.filters = that.filters;
     this.filterSelectivity = that.filterSelectivity;
+    this.maxRecords = that.maxRecords;
 
     // Calcite makes many copies in the later stage of planning
     // without changing anything. Retain the previous stats.
@@ -92,6 +95,7 @@ public class HttpGroupScan extends AbstractGroupScan {
     this.filters = that.filters;
     this.filterSelectivity = that.filterSelectivity;
     this.scanStats = computeScanStats();
+    this.maxRecords = that.maxRecords;
   }
 
   /**
@@ -107,9 +111,26 @@ public class HttpGroupScan extends AbstractGroupScan {
     this.filters = filters;
     this.filterSelectivity = filterSelectivity;
     this.scanStats = computeScanStats();
+    this.maxRecords = that.maxRecords;
   }
 
   /**
+   * Adds a limit to the scan.
+   */
+  public HttpGroupScan(HttpGroupScan that, int maxRecords) {
+    super(that);
+    this.columns = that.columns;
+    this.httpScanSpec = that.httpScanSpec;
+
+    // Applies a filter.
+    this.filters = that.filters;
+    this.filterSelectivity = that.filterSelectivity;
+    this.scanStats = computeScanStats();
+    this.maxRecords = maxRecords;
+  }
+
+
+  /**
    * Deserialize a group scan. Not called in normal operation. Probably used
    * only if Drill executes a logical plan.
    */
@@ -118,7 +139,8 @@ public class HttpGroupScan extends AbstractGroupScan {
     @JsonProperty("columns") List<SchemaPath> columns,
     @JsonProperty("httpScanSpec") HttpScanSpec httpScanSpec,
     @JsonProperty("filters") Map<String, String> filters,
-    @JsonProperty("filterSelectivity") double selectivity
+    @JsonProperty("filterSelectivity") double selectivity,
+    @JsonProperty("maxRecords") int maxRecords
   ) {
     super("no-user");
     this.columns = columns;
@@ -126,6 +148,7 @@ public class HttpGroupScan extends AbstractGroupScan {
     this.filters = filters;
     this.filterSelectivity = selectivity;
     this.scanStats = computeScanStats();
+    this.maxRecords = maxRecords;
   }
 
   @JsonProperty("columns")
@@ -161,7 +184,7 @@ public class HttpGroupScan extends AbstractGroupScan {
 
   @Override
   public SubScan getSpecificScan(int minorFragmentId) {
-    return new HttpSubScan(httpScanSpec, columns, filters);
+    return new HttpSubScan(httpScanSpec, columns, filters, maxRecords);
   }
 
   @Override
@@ -175,6 +198,10 @@ public class HttpGroupScan extends AbstractGroupScan {
     return toString();
   }
 
+  @JsonProperty("maxRecords")
+  public int maxRecords() { return maxRecords; }
+
+
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
@@ -226,11 +253,25 @@ public class HttpGroupScan extends AbstractGroupScan {
   }
 
   @Override
+  public boolean supportsLimitPushdown() {
+    return true;
+  }
+
+  @Override
+  public GroupScan applyLimit(int maxRecords) {
+    if (maxRecords == this.maxRecords) {
+      return null;
+    }
+    return new HttpGroupScan(this, maxRecords);
+  }
+
+  @Override
   public String toString() {
     return new PlanStringBuilder(this)
       .field("scan spec", httpScanSpec)
       .field("columns", columns)
       .field("filters", filters)
+      .field("maxRecords", maxRecords)
       .toString();
   }
 
diff --git 
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
 
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
index 1706c7c..d818824 100644
--- 
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
+++ 
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
@@ -41,16 +41,20 @@ public class HttpSubScan extends AbstractBase implements 
SubScan {
   private final HttpScanSpec tableSpec;
   private final List<SchemaPath> columns;
   private final Map<String, String> filters;
+  private final int maxRecords;
 
   @JsonCreator
   public HttpSubScan(
     @JsonProperty("tableSpec") HttpScanSpec tableSpec,
     @JsonProperty("columns") List<SchemaPath> columns,
-    @JsonProperty("filters") Map<String, String> filters) {
+    @JsonProperty("filters") Map<String, String> filters,
+    @JsonProperty("maxRecords") int maxRecords
+    ) {
     super("user-if-needed");
     this.tableSpec = tableSpec;
     this.columns = columns;
     this.filters = filters;
+    this.maxRecords = maxRecords;
   }
 
   @JsonProperty("tableSpec")
@@ -68,6 +72,11 @@ public class HttpSubScan extends AbstractBase implements 
SubScan {
     return filters;
   }
 
+  @JsonProperty("maxRecords")
+  public int maxRecords() {
+    return maxRecords;
+  }
+
  @Override
   public <T, X, E extends Throwable> T accept(
    PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
@@ -76,7 +85,7 @@ public class HttpSubScan extends AbstractBase implements 
SubScan {
 
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
-    return new HttpSubScan(tableSpec, columns, filters);
+    return new HttpSubScan(tableSpec, columns, filters, maxRecords);
   }
 
   @Override
@@ -96,6 +105,7 @@ public class HttpSubScan extends AbstractBase implements 
SubScan {
       .field("tableSpec", tableSpec)
       .field("columns", columns)
       .field("filters", filters)
+      .field("maxRecords", maxRecords)
       .toString();
   }
 
@@ -115,6 +125,7 @@ public class HttpSubScan extends AbstractBase implements 
SubScan {
     HttpSubScan other = (HttpSubScan) obj;
     return Objects.equals(tableSpec, other.tableSpec)
       && Objects.equals(columns, other.columns)
-      && Objects.equals(filters, other.filters);
+      && Objects.equals(filters, other.filters)
+      && Objects.equals(maxRecords, other.maxRecords);
   }
 }
diff --git 
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
 
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 8918e74..5cb9991 100644
--- 
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++ 
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -206,14 +206,18 @@ public class SimpleHttp {
   private void setupCache(Builder builder) {
     int cacheSize = 10 * 1024 * 1024;   // TODO Add cache size in MB to config
     File cacheDirectory = new File(tempDir, "http-cache");
-    if (!cacheDirectory.mkdirs()) {
-      throw UserException.dataWriteError()
-        .message("Could not create the HTTP cache directory")
-        .addContext("Path", cacheDirectory.getAbsolutePath())
-        .addContext("Please check the temp directory or disable HTTP caching.")
-        .addContext(errorContext)
-        .build(logger);
+    if (!cacheDirectory.exists()) {
+      if (!cacheDirectory.mkdirs()) {
+        throw UserException
+          .dataWriteError()
+          .message("Could not create the HTTP cache directory")
+          .addContext("Path", cacheDirectory.getAbsolutePath())
+          .addContext("Please check the temp directory or disable HTTP 
caching.")
+          .addContext(errorContext)
+          .build(logger);
+      }
     }
+
     try {
       Cache cache = new Cache(cacheDirectory, cacheSize);
       logger.debug("Caching HTTP Query Results at: {}", cacheDirectory);
diff --git 
a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
 
b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 4c20e99..442aa5f 100644
--- 
a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++ 
b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -452,6 +452,17 @@ public class TestHttpPlugin extends ClusterTest {
   }
 
   @Test
+  public void testLimitPushdown() throws Exception {
+    String sql = "SELECT sunrise, sunset FROM 
local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1 LIMIT 5";
+
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=5")
+      .match();
+  }
+
+  @Test
   public void testSlowResponse() throws Exception {
     try (MockWebServer server = startServer()) {
 

Reply via email to