This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new b2c2d96 DRILL-7799: REST Queries Fail if Caching is Enabled
b2c2d96 is described below
commit b2c2d96375ae8c461e66269092af4be50013d074
Author: Charles Givre <[email protected]>
AuthorDate: Thu Oct 29 08:53:37 2020 -0400
DRILL-7799: REST Queries Fail if Caching is Enabled
---
.../drill/exec/store/http/HttpBatchReader.java | 9 +++++
.../drill/exec/store/http/HttpCSVBatchReader.java | 6 +++
.../drill/exec/store/http/HttpGroupScan.java | 45 +++++++++++++++++++++-
.../apache/drill/exec/store/http/HttpSubScan.java | 17 ++++++--
.../drill/exec/store/http/util/SimpleHttp.java | 18 +++++----
.../drill/exec/store/http/TestHttpPlugin.java | 11 ++++++
6 files changed, 94 insertions(+), 12 deletions(-)
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index 1b322dd..83058e2 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -40,10 +40,13 @@ import java.util.Map;
public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
private final HttpSubScan subScan;
+ private final int maxRecords;
private JsonLoader jsonLoader;
+ private int recordCount;
public HttpBatchReader(HttpSubScan subScan) {
this.subScan = subScan;
+ this.maxRecords = subScan.maxRecords();
}
@Override
@@ -141,6 +144,12 @@ public class HttpBatchReader implements
ManagedReader<SchemaNegotiator> {
@Override
public boolean next() {
+ recordCount++;
+
+ // Stop after the limit has been reached
+ if (maxRecords >= 1 && recordCount > maxRecords) {
+ return false;
+ }
return jsonLoader.readBatch();
}
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
index f7db13b..831f6bd 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
@@ -44,6 +44,7 @@ import java.util.List;
public class HttpCSVBatchReader extends HttpBatchReader {
private final HttpSubScan subScan;
private final CsvParserSettings csvSettings;
+ private final int maxRecords;
private CsvParser csvReader;
private List<StringColumnWriter> columnWriters;
private String[] firstRow;
@@ -55,6 +56,7 @@ public class HttpCSVBatchReader extends HttpBatchReader {
public HttpCSVBatchReader(HttpSubScan subScan) {
super(subScan);
this.subScan = subScan;
+ this.maxRecords = subScan.maxRecords();
this.csvSettings = new CsvParserSettings();
csvSettings.setLineSeparatorDetectionEnabled(true);
@@ -102,6 +104,10 @@ public class HttpCSVBatchReader extends HttpBatchReader {
@Override
public boolean next() {
while (!rowWriter.isFull()) {
+ if (rowWriter.limitReached(maxRecords)) {
+ return false;
+ }
+
if (!processRow()) {
return false;
}
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
index ff0f7d6..4f2d9e1 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpGroupScan.java
@@ -47,6 +47,7 @@ public class HttpGroupScan extends AbstractGroupScan {
private final Map<String, String> filters;
private final ScanStats scanStats;
private final double filterSelectivity;
+ private final int maxRecords;
// Used only in planner, not serialized
private int hashCode;
@@ -61,6 +62,7 @@ public class HttpGroupScan extends AbstractGroupScan {
this.filters = null;
this.filterSelectivity = 0.0;
this.scanStats = computeScanStats();
+ this.maxRecords = -1;
}
/**
@@ -72,6 +74,7 @@ public class HttpGroupScan extends AbstractGroupScan {
this.columns = that.columns;
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
+ this.maxRecords = that.maxRecords;
// Calcite makes many copies in the later stage of planning
// without changing anything. Retain the previous stats.
@@ -92,6 +95,7 @@ public class HttpGroupScan extends AbstractGroupScan {
this.filters = that.filters;
this.filterSelectivity = that.filterSelectivity;
this.scanStats = computeScanStats();
+ this.maxRecords = that.maxRecords;
}
/**
@@ -107,9 +111,26 @@ public class HttpGroupScan extends AbstractGroupScan {
this.filters = filters;
this.filterSelectivity = filterSelectivity;
this.scanStats = computeScanStats();
+ this.maxRecords = that.maxRecords;
}
/**
+ * Adds a limit to the scan.
+ */
+ public HttpGroupScan(HttpGroupScan that, int maxRecords) {
+ super(that);
+ this.columns = that.columns;
+ this.httpScanSpec = that.httpScanSpec;
+
+ // Applies a filter.
+ this.filters = that.filters;
+ this.filterSelectivity = that.filterSelectivity;
+ this.scanStats = computeScanStats();
+ this.maxRecords = maxRecords;
+ }
+
+
+ /**
* Deserialize a group scan. Not called in normal operation. Probably used
* only if Drill executes a logical plan.
*/
@@ -118,7 +139,8 @@ public class HttpGroupScan extends AbstractGroupScan {
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("httpScanSpec") HttpScanSpec httpScanSpec,
@JsonProperty("filters") Map<String, String> filters,
- @JsonProperty("filterSelectivity") double selectivity
+ @JsonProperty("filterSelectivity") double selectivity,
+ @JsonProperty("maxRecords") int maxRecords
) {
super("no-user");
this.columns = columns;
@@ -126,6 +148,7 @@ public class HttpGroupScan extends AbstractGroupScan {
this.filters = filters;
this.filterSelectivity = selectivity;
this.scanStats = computeScanStats();
+ this.maxRecords = maxRecords;
}
@JsonProperty("columns")
@@ -161,7 +184,7 @@ public class HttpGroupScan extends AbstractGroupScan {
@Override
public SubScan getSpecificScan(int minorFragmentId) {
- return new HttpSubScan(httpScanSpec, columns, filters);
+ return new HttpSubScan(httpScanSpec, columns, filters, maxRecords);
}
@Override
@@ -175,6 +198,10 @@ public class HttpGroupScan extends AbstractGroupScan {
return toString();
}
+ @JsonProperty("maxRecords")
+ public int maxRecords() { return maxRecords; }
+
+
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
@@ -226,11 +253,25 @@ public class HttpGroupScan extends AbstractGroupScan {
}
@Override
+ public boolean supportsLimitPushdown() {
+ return true;
+ }
+
+ @Override
+ public GroupScan applyLimit(int maxRecords) {
+ if (maxRecords == this.maxRecords) {
+ return null;
+ }
+ return new HttpGroupScan(this, maxRecords);
+ }
+
+ @Override
public String toString() {
return new PlanStringBuilder(this)
.field("scan spec", httpScanSpec)
.field("columns", columns)
.field("filters", filters)
+ .field("maxRecords", maxRecords)
.toString();
}
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
index 1706c7c..d818824 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpSubScan.java
@@ -41,16 +41,20 @@ public class HttpSubScan extends AbstractBase implements
SubScan {
private final HttpScanSpec tableSpec;
private final List<SchemaPath> columns;
private final Map<String, String> filters;
+ private final int maxRecords;
@JsonCreator
public HttpSubScan(
@JsonProperty("tableSpec") HttpScanSpec tableSpec,
@JsonProperty("columns") List<SchemaPath> columns,
- @JsonProperty("filters") Map<String, String> filters) {
+ @JsonProperty("filters") Map<String, String> filters,
+ @JsonProperty("maxRecords") int maxRecords
+ ) {
super("user-if-needed");
this.tableSpec = tableSpec;
this.columns = columns;
this.filters = filters;
+ this.maxRecords = maxRecords;
}
@JsonProperty("tableSpec")
@@ -68,6 +72,11 @@ public class HttpSubScan extends AbstractBase implements
SubScan {
return filters;
}
+ @JsonProperty("maxRecords")
+ public int maxRecords() {
+ return maxRecords;
+ }
+
@Override
public <T, X, E extends Throwable> T accept(
PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
@@ -76,7 +85,7 @@ public class HttpSubScan extends AbstractBase implements
SubScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
- return new HttpSubScan(tableSpec, columns, filters);
+ return new HttpSubScan(tableSpec, columns, filters, maxRecords);
}
@Override
@@ -96,6 +105,7 @@ public class HttpSubScan extends AbstractBase implements
SubScan {
.field("tableSpec", tableSpec)
.field("columns", columns)
.field("filters", filters)
+ .field("maxRecords", maxRecords)
.toString();
}
@@ -115,6 +125,7 @@ public class HttpSubScan extends AbstractBase implements
SubScan {
HttpSubScan other = (HttpSubScan) obj;
return Objects.equals(tableSpec, other.tableSpec)
&& Objects.equals(columns, other.columns)
- && Objects.equals(filters, other.filters);
+ && Objects.equals(filters, other.filters)
+ && Objects.equals(maxRecords, other.maxRecords);
}
}
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 8918e74..5cb9991 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -206,14 +206,18 @@ public class SimpleHttp {
private void setupCache(Builder builder) {
int cacheSize = 10 * 1024 * 1024; // TODO Add cache size in MB to config
File cacheDirectory = new File(tempDir, "http-cache");
- if (!cacheDirectory.mkdirs()) {
- throw UserException.dataWriteError()
- .message("Could not create the HTTP cache directory")
- .addContext("Path", cacheDirectory.getAbsolutePath())
- .addContext("Please check the temp directory or disable HTTP caching.")
- .addContext(errorContext)
- .build(logger);
+ if (!cacheDirectory.exists()) {
+ if (!cacheDirectory.mkdirs()) {
+ throw UserException
+ .dataWriteError()
+ .message("Could not create the HTTP cache directory")
+ .addContext("Path", cacheDirectory.getAbsolutePath())
+ .addContext("Please check the temp directory or disable HTTP
caching.")
+ .addContext(errorContext)
+ .build(logger);
+ }
}
+
try {
Cache cache = new Cache(cacheDirectory, cacheSize);
logger.debug("Caching HTTP Query Results at: {}", cacheDirectory);
diff --git
a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 4c20e99..442aa5f 100644
---
a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++
b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -452,6 +452,17 @@ public class TestHttpPlugin extends ClusterTest {
}
@Test
+ public void testLimitPushdown() throws Exception {
+ String sql = "SELECT sunrise, sunset FROM
local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02` AS t1 LIMIT 5";
+
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("Limit", "maxRecords=5")
+ .match();
+ }
+
+ @Test
public void testSlowResponse() throws Exception {
try (MockWebServer server = startServer()) {