This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 172bd06 DRILL-7951: Add Response Metadata Fields to HTTP Storage
Plugin (#2256)
172bd06 is described below
commit 172bd06e11824d664681098bd2817ece89b7aed0
Author: Charles S. Givre <[email protected]>
AuthorDate: Tue Jun 29 20:21:39 2021 -0400
DRILL-7951: Add Response Metadata Fields to HTTP Storage Plugin (#2256)
* Initial Commit
* Ready for code review
* Update README
* Fix NPE
* Whitespace fix
* Addressed Review Comments
---
.../org/apache/drill/exec/store/xml/XMLReader.java | 10 +-
contrib/storage-http/README.md | 18 ++-
.../drill/exec/store/http/HttpApiConfig.java | 14 +-
.../drill/exec/store/http/HttpBatchReader.java | 32 +++-
.../drill/exec/store/http/HttpCSVBatchReader.java | 7 +
.../drill/exec/store/http/HttpXMLBatchReader.java | 7 +
.../drill/exec/store/http/util/SimpleHttp.java | 67 ++++++++-
.../drill/exec/store/http/TestHttpPlugin.java | 90 ++++++++++-
.../drill/exec/store/ImplicitColumnUtils.java | 165 +++++++++++++++++++++
.../store/easy/json/loader/JsonLoaderImpl.java | 25 +++-
10 files changed, 416 insertions(+), 19 deletions(-)
diff --git
a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java
b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java
index e51ded6..97ce737 100644
---
a/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java
+++
b/contrib/format-xml/src/main/java/org/apache/drill/exec/store/xml/XMLReader.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.store.ImplicitColumnUtils.ImplicitColumns;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.exec.vector.accessor.TupleWriter;
import org.slf4j.Logger;
@@ -70,6 +71,7 @@ public class XMLReader {
private String fieldValue;
private InputStream fsStream;
private XMLEventReader reader;
+ private ImplicitColumns metadata;
/**
* This field indicates the various states in which the reader operates. The
names should be self explanatory,
@@ -94,7 +96,6 @@ public class XMLReader {
nestedMapCollection = new HashMap<>();
this.dataLevel = dataLevel;
this.maxRecords = maxRecords;
-
}
public void open(RowSetLoader rootRowWriter, CustomErrorContext errorContext
) {
@@ -331,6 +332,10 @@ public class XMLReader {
}
}
+ public void implicitFields(ImplicitColumns metadata) {
+ this.metadata = metadata;
+ }
+
private TupleWriter startRow(RowSetLoader writer) {
if (currentNestingLevel == dataLevel) {
rootRowWriter.start();
@@ -350,6 +355,9 @@ public class XMLReader {
*/
private TupleWriter endRow() {
logger.debug("Ending row");
+ if (metadata != null) {
+ metadata.writeImplicitColumns();
+ }
rootRowWriter.save();
rowStarted = false;
changeState(xmlState.ROW_ENDED);
diff --git a/contrib/storage-http/README.md b/contrib/storage-http/README.md
index 6cff6ca..9a5e048 100644
--- a/contrib/storage-http/README.md
+++ b/contrib/storage-http/README.md
@@ -215,7 +215,7 @@ At present, there is no provision to check the `status`
code in a response such
as that shown above. Drill assumes that the server will uses HTTP status codes
to
indicate a bad request or other error.
-### Input Type
+#### Input Type
The REST plugin accepts three different types of input: `json`, `csv` and
`xml`. The default is `json`. If you are using `XML` as a data type, there is
an additional
configuration option called `xmlDataLevel` which reduces the level of unneeded
nesting found in XML files. You can find more information in the documentation
for Drill's XML
format plugin.
@@ -231,6 +231,11 @@ If the `authType` is set to `basic`, `username` and
`password` must be set in th
`password`: The password for basic authentication.
+#### errorOn400
+When a user makes HTTP calls, the response code will be from 100-599. 400
series error codes can contain useful information and in some cases you would
not want Drill to throw
+errors on 400 series errors. This option allows you to define Drill's
behavior on 400 series error codes. When set to `true`, Drill will throw an
exception and halt execution
+on 400 series errors, `false` will return an empty result set (with implicit
fields populated).
+
## Usage
This plugin is different from other plugins in that it the table component of
the `FROM` clause
@@ -371,6 +376,7 @@ To query this API, set the configuration as follows:
"password": null,
"postBody": null,
"inputType": "json",
+ "errorOn400": true
}
}
@@ -416,7 +422,8 @@ body. Set the configuration as follows:
"authType": "none",
"userName": null,
"password": null,
- "postBody": null
+ "postBody": null,
+ "errorOn400": true
}
}
@@ -555,3 +562,10 @@ If the query runs, but produces odd results, try a simple
`SELECT *` query. This
if there is unexpected message context in addition to the data. Use the
`dataPath` property
to ignore the extra content.
+## Implicit Fields
+The HTTP plugin includes four implicit fields which can be used for debugging.
These fields do not appear in star queries. They are:
+
+* `_response_code`: The response code from the HTTP request. This field is an
`INT`.
+* `_response_message`: The response message.
+* `_response_protocol`: The response protocol.
+* `_response_url`: The actual URL sent to the API.
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
index 86bd2ee..709faf8 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpApiConfig.java
@@ -73,6 +73,7 @@ public class HttpApiConfig {
private final String password;
private final String inputType;
private final int xmlDataLevel;
+ private final boolean errorOn400;
public enum HttpMethod {
@@ -97,7 +98,8 @@ public class HttpApiConfig {
@JsonProperty("dataPath") String dataPath,
@JsonProperty("requireTail") Boolean requireTail,
@JsonProperty("inputType") String inputType,
- @JsonProperty("xmlDataLevel") int xmlDataLevel) {
+ @JsonProperty("xmlDataLevel") int xmlDataLevel,
+ @JsonProperty("errorOn400") boolean errorOn400) {
this.headers = headers;
this.method = Strings.isNullOrEmpty(method)
@@ -139,6 +141,7 @@ public class HttpApiConfig {
? DEFAULT_INPUT_FORMAT : inputType.trim().toLowerCase();
this.xmlDataLevel = Math.max(1, xmlDataLevel);
+ this.errorOn400 = errorOn400;
}
@JsonProperty("url")
@@ -182,10 +185,13 @@ public class HttpApiConfig {
@JsonProperty("inputType")
public String inputType() { return inputType; }
+ @JsonProperty("errorOn400")
+ public boolean errorOn400() { return errorOn400; }
+
@Override
public int hashCode() {
return Objects.hash(url, method, requireTail, params, headers,
- authType, userName, password, postBody, inputType, xmlDataLevel);
+ authType, userName, password, postBody, inputType, xmlDataLevel,
errorOn400);
}
@Override
@@ -203,6 +209,7 @@ public class HttpApiConfig {
.field("filterFields", params)
.field("inputType", inputType)
.field("xmlDataLevel", xmlDataLevel)
+ .field("errorOn400", errorOn400)
.toString();
}
@@ -226,6 +233,7 @@ public class HttpApiConfig {
&& Objects.equals(dataPath, other.dataPath)
&& Objects.equals(requireTail, other.requireTail)
&& Objects.equals(inputType, other.inputType)
- && Objects.equals(xmlDataLevel, other.xmlDataLevel);
+ && Objects.equals(xmlDataLevel, other.xmlDataLevel)
+ && Objects.equals(errorOn400, other.errorOn400);
}
}
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
index 4ac41fe..cc8eb24 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
@@ -24,15 +24,18 @@ import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.ChildErrorContext;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.store.easy.json.loader.JsonLoader;
import
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
import org.apache.drill.exec.store.http.util.HttpProxyConfig;
import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
import org.apache.drill.exec.store.http.util.SimpleHttp;
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.exec.store.ImplicitColumnUtils.ImplicitColumns;
import java.io.File;
import java.io.InputStream;
@@ -40,10 +43,15 @@ import java.util.List;
import java.util.Map;
public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
+
+ private static final String[] STRING_METADATA_FIELDS = {"_response_message",
"_response_protocol", "_response_url"};
+ private static final String RESPONSE_CODE_FIELD = "_response_code";
+
private final HttpSubScan subScan;
private final int maxRecords;
private JsonLoader jsonLoader;
private int recordCount;
+ protected ImplicitColumns implicitColumns;
public HttpBatchReader(HttpSubScan subScan) {
this.subScan = subScan;
@@ -77,10 +85,17 @@ public class HttpBatchReader implements
ManagedReader<SchemaNegotiator> {
errorContext);
// JSON loader setup
+ ResultSetLoader loader = negotiator.build();
+ implicitColumns = new ImplicitColumns(loader.writer());
+ buildImplicitColumns();
+
InputStream inStream = http.getInputStream();
+ populateImplicitFieldMap(http);
+
try {
jsonLoader = new JsonLoaderBuilder()
- .resultSetLoader(negotiator.build())
+ .implicitFields(implicitColumns)
+ .resultSetLoader(loader)
.standardOptions(negotiator.queryOptions())
.dataPath(subScan.tableSpec().connectionConfig().dataPath())
.errorContext(errorContext)
@@ -97,6 +112,21 @@ public class HttpBatchReader implements
ManagedReader<SchemaNegotiator> {
return true; // Please read the first batch
}
+ protected void buildImplicitColumns() {
+ // Add String fields
+ for (String fieldName : STRING_METADATA_FIELDS) {
+ implicitColumns.addImplicitColumn(fieldName, MinorType.VARCHAR);
+ }
+ implicitColumns.addImplicitColumn(RESPONSE_CODE_FIELD, MinorType.INT);
+ }
+
+ protected void populateImplicitFieldMap(SimpleHttp http) {
+
implicitColumns.getColumn(STRING_METADATA_FIELDS[0]).setValue(http.getResponseMessage());
+
implicitColumns.getColumn(STRING_METADATA_FIELDS[1]).setValue(http.getResponseProtocol());
+
implicitColumns.getColumn(STRING_METADATA_FIELDS[2]).setValue(http.getResponseURL());
+
implicitColumns.getColumn(RESPONSE_CODE_FIELD).setValue(http.getResponseCode());
+ }
+
protected HttpUrl buildUrl() {
HttpApiConfig apiConfig = subScan.tableSpec().connectionConfig();
String baseUrl = apiConfig.url();
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
index 831f6bd..acae0ca 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpCSVBatchReader.java
@@ -32,6 +32,7 @@ import
org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.ImplicitColumnUtils.ImplicitColumns;
import org.apache.drill.exec.store.http.util.SimpleHttp;
import org.apache.drill.exec.vector.accessor.ScalarWriter;
import org.apache.drill.shaded.guava.com.google.common.base.Strings;
@@ -94,6 +95,11 @@ public class HttpCSVBatchReader extends HttpBatchReader {
negotiator.tableSchema(drillSchema, true);
ResultSetLoader resultLoader = negotiator.build();
+ // Add implicit columns
+ implicitColumns = new ImplicitColumns(resultLoader.writer());
+ buildImplicitColumns();
+ populateImplicitFieldMap(http);
+
// Create ScalarWriters
rowWriter = resultLoader.writer();
populateWriterArray();
@@ -154,6 +160,7 @@ public class HttpCSVBatchReader extends HttpBatchReader {
for (StringColumnWriter columnWriter : columnWriters) {
columnWriter.load(nextRow);
}
+ implicitColumns.writeImplicitColumns();
rowWriter.save();
return true;
}
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
index 2f1b38e..e1d691e 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.store.ImplicitColumnUtils.ImplicitColumns;
import org.apache.drill.exec.store.http.util.SimpleHttp;
import org.apache.drill.exec.store.xml.XMLReader;
import org.slf4j.Logger;
@@ -80,8 +81,14 @@ public class HttpXMLBatchReader extends HttpBatchReader {
try {
xmlReader = new XMLReader(inStream, dataLevel, maxRecords);
ResultSetLoader resultLoader = negotiator.build();
+
+ implicitColumns = new ImplicitColumns(resultLoader.writer());
+ buildImplicitColumns();
+ populateImplicitFieldMap(http);
+
RowSetLoader rootRowWriter = resultLoader.writer();
xmlReader.open(rootRowWriter, errorContext);
+ xmlReader.implicitFields(implicitColumns);
} catch (XMLStreamException e) {
throw UserException
.dataReadError(e)
diff --git
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
index 5cb9991..5e78c41 100644
---
a/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
+++
b/contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
@@ -64,6 +64,10 @@ public class SimpleHttp {
private final HttpProxyConfig proxyConfig;
private final CustomErrorContext errorContext;
private final HttpUrl url;
+ private String responseMessage;
+ private int responseCode;
+ private String responseProtocol;
+ private String responseURL;
public SimpleHttp(HttpSubScan scanDefn, HttpUrl url, File tempDir,
HttpProxyConfig proxyConfig, CustomErrorContext errorContext) {
@@ -171,8 +175,14 @@ public class SimpleHttp {
.newCall(request)
.execute();
+ // Preserve the response
+ responseMessage = response.message();
+ responseCode = response.code();
+ responseProtocol = response.protocol().toString();
+ responseURL = response.request().url().toString();
+
// If the request is unsuccessful, throw a UserException
- if (!response.isSuccessful()) {
+ if (! isSuccessful(responseCode)) {
throw UserException
.dataReadError()
.message("HTTP request failed")
@@ -182,7 +192,7 @@ public class SimpleHttp {
.build(logger);
}
logger.debug("HTTP Request for {} successful.", url());
- logger.debug("Response Headers: {} ", response.headers().toString());
+ logger.debug("Response Headers: {} ", response.headers());
// Return the InputStream of the response
return Objects.requireNonNull(response.body()).byteStream();
@@ -197,6 +207,59 @@ public class SimpleHttp {
}
/**
+ * This function is a replacement for the isSuccessful() function which comes
+ * with okhttp3. The issue is that in some cases, a user may not want Drill
to throw
+ * errors on 400 response codes. This function will return true/false
depending on the
+ * configuration for the specific connection.
+ * @param responseCode An int of the connection code
+ * @return True if the response code is 200-299 and possibly 400-499, false
if other
+ */
+ private boolean isSuccessful(int responseCode) {
+ if (scanDefn.tableSpec().connectionConfig().errorOn400()) {
+ return ((responseCode >= 200 && responseCode <=299) ||
+ (responseCode >= 400 && responseCode <=499));
+ } else {
+ return responseCode >= 200 && responseCode <=299;
+ }
+ }
+
+ /**
+ * Gets the HTTP response code from the HTTP call. Note that this value
+ * is only available after the getInputStream() method has been called.
+ * @return int value of the HTTP response code
+ */
+ public int getResponseCode() {
+ return responseCode;
+ }
+
+ /**
+ * Gets the HTTP response code from the HTTP call. Note that this value
+ * is only available after the getInputStream() method has been called.
+ * @return int of HTTP response code
+ */
+ public String getResponseMessage() {
+ return responseMessage;
+ }
+
+ /**
+ * Gets the HTTP response code from the HTTP call. Note that this value
+ * is only available after the getInputStream() method has been called.
+ * @return The HTTP response protocol
+ */
+ public String getResponseProtocol() {
+ return responseProtocol;
+ }
+
+ /**
+ * Gets the HTTP response code from the HTTP call. Note that this value
+ * is only available after the getInputStream() method has been called.
+ * @return The HTTP response URL
+ */
+ public String getResponseURL() {
+ return responseURL;
+ }
+
+ /**
* Configures response caching using a provided temp directory.
*
* @param builder
diff --git
a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
index 934d12a..7aa510c 100644
---
a/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
+++
b/contrib/storage-http/src/test/java/org/apache/drill/exec/store/http/TestHttpPlugin.java
@@ -89,12 +89,12 @@ public class TestHttpPlugin extends ClusterTest {
*/
private static void makeLiveConfig() {
- HttpApiConfig sunriseConfig = new
HttpApiConfig("https://api.sunrise-sunset.org/json", "GET", null, null, null,
null, null, null, null, null, null, 0);
+ HttpApiConfig sunriseConfig = new
HttpApiConfig("https://api.sunrise-sunset.org/json", "GET", null, null, null,
null, null, null, null, null, null, 0, false);
HttpApiConfig sunriseWithParamsConfig = new
HttpApiConfig("https://api.sunrise-sunset.org/json", "GET", null, null, null,
null, null,
- Arrays.asList("lat", "lng", "date"), "results", false, null, 0);
+ Arrays.asList("lat", "lng", "date"), "results", false, null, 0, false);
HttpApiConfig stockConfig = new
HttpApiConfig("https://api.worldtradingdata.com/api/v1/stock?symbol=SNAP,TWTR,VOD"
+
-
".L&api_token=zuHlu2vZaehdZN6GmJdTiVlp7xgZn6gl6sfgmI4G6TY4ej0NLOzvy0TUl4D4",
"get", null, null, null, null, null, null, null, null, null, 0);
+
".L&api_token=zuHlu2vZaehdZN6GmJdTiVlp7xgZn6gl6sfgmI4G6TY4ej0NLOzvy0TUl4D4",
"get", null, null, null, null, null, null, null, null, null, 0, false);
Map<String, HttpApiConfig> configs = new HashMap<>();
configs.put("stock", stockConfig);
@@ -122,7 +122,7 @@ public class TestHttpPlugin extends ClusterTest {
// The connection acts like a schema.
// Ignores the message body except for data.
HttpApiConfig mockSchema = new HttpApiConfig("http://localhost:8091/json",
"GET", headers,
- "basic", "user", "pass", null, null, "results", null, null, 0);
+ "basic", "user", "pass", null, null, "results", null, null, 0, false);
// Use the mock server with the HTTP parameters passed as WHERE
// clause filters. The connection acts like a table.
@@ -130,15 +130,15 @@ public class TestHttpPlugin extends ClusterTest {
// This is the preferred approach, the base URL contains as much info as
possible;
// all other parameters are specified in SQL. See README for an example.
HttpApiConfig mockTable = new HttpApiConfig("http://localhost:8091/json",
"GET", headers,
- "basic", "user", "pass", null, Arrays.asList("lat", "lng", "date"),
"results", false, null, 0);
+ "basic", "user", "pass", null, Arrays.asList("lat", "lng", "date"),
"results", false, null, 0, false);
- HttpApiConfig mockPostConfig = new HttpApiConfig("http://localhost:8091/",
"POST", headers, null, null, null, "key1=value1\nkey2=value2", null, null,
null, null, 0);
+ HttpApiConfig mockPostConfig = new HttpApiConfig("http://localhost:8091/",
"POST", headers, null, null, null, "key1=value1\nkey2=value2", null, null,
null, null, 0, false);
HttpApiConfig mockCsvConfig = new
HttpApiConfig("http://localhost:8091/csv", "GET", headers,
- "basic", "user", "pass", null, null, "results", null, "csv", 0);
+ "basic", "user", "pass", null, null, "results", null, "csv", 0, false);
HttpApiConfig mockXmlConfig = new
HttpApiConfig("http://localhost:8091/xml", "GET", headers,
- "basic", "user", "pass", null, null, "results", null, "xml", 2);
+ "basic", "user", "pass", null, null, "results", null, "xml", 2,false);
Map<String, HttpApiConfig> configs = new HashMap<>();
configs.put("sunrise", mockSchema);
@@ -394,6 +394,80 @@ public class TestHttpPlugin extends ClusterTest {
}
}
+ @Test
+ public void testImplicitFieldsWithJSON() throws Exception {
+ String sql = "SELECT _response_code, _response_message,
_response_protocol, _response_url FROM
local.sunrise.`?lat=36.7201600&lng=-4.4203400&date=2019-10-02`";
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(new
MockResponse().setResponseCode(200).setBody(TEST_JSON_RESPONSE));
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("_response_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
+ .add("_response_message", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("_response_protocol", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("_response_url", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(200, "OK", "http/1.1",
"http://localhost:8091/json?lat=36.7201600&lng=-4.4203400&date=2019-10-02")
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
+ @Test
+ public void testImplicitFieldsWithCSV() throws Exception {
+ String sql = "SELECT _response_code, _response_message,
_response_protocol, _response_url FROM local.mockcsv.`csv?arg1=4`";
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(new
MockResponse().setResponseCode(200).setBody(TEST_CSV_RESPONSE));
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("_response_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
+ .add("_response_message", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("_response_protocol", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("_response_url", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(200, "OK", "http/1.1", "http://localhost:8091/csvcsv?arg1=4")
+ .addRow(200, "OK", "http/1.1", "http://localhost:8091/csvcsv?arg1=4")
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
+ @Test
+ public void testImplicitFieldsWithXML() throws Exception {
+ String sql = "SELECT _response_code, _response_message,
_response_protocol, _response_url FROM local.mockxml.`?arg1=4` LIMIT 5";
+ try (MockWebServer server = startServer()) {
+
+ server.enqueue(new
MockResponse().setResponseCode(200).setBody(TEST_XML_RESPONSE));
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("_response_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
+ .add("_response_message", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("_response_protocol", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("_response_url", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(200, "OK", "http/1.1", "http://localhost:8091/xml?arg1=4")
+ .addRow(200, "OK", "http/1.1", "http://localhost:8091/xml?arg1=4")
+ .addRow(200, "OK", "http/1.1", "http://localhost:8091/xml?arg1=4")
+ .addRow(200, "OK", "http/1.1", "http://localhost:8091/xml?arg1=4")
+ .addRow(200, "OK", "http/1.1", "http://localhost:8091/xml?arg1=4")
+ .build();
+
+ RowSetUtilities.verify(expected, results);
+ }
+ }
+
private void doSimpleTestWithMockServer(String sql) throws Exception {
try (MockWebServer server = startServer()) {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnUtils.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnUtils.java
new file mode 100644
index 0000000..626debd
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnUtils.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ImplicitColumnUtils {
+ private static final Logger logger =
LoggerFactory.getLogger(ImplicitColumnUtils.class);
+
+ /**
+ * This class represents an implicit column in a dataset. These columns are
typically used for metadata that is consistent
+ * across an entire dataset. A filename for example, or HTTP response
codes. It is good practice to name
+ * implicit fields with an underscore so that these field names do not
conflict with fields from the user's
+ * data. For example _http_response_code.
+ *
+ * Implicit fields do not appear in star queries so a user must explicitly
include them in queries for them to appear.
+ */
+ public static class ImplicitColumns {
+ private final Map<String, ImplicitColumn> implicitColumns;
+ private final RowSetLoader rowWriter;
+
+ public ImplicitColumns(RowSetLoader rowWriter) {
+ this.implicitColumns = new HashMap<>();
+ this.rowWriter = rowWriter;
+ }
+
+ /**
+ * Adds an implicit column. If there already is a column with the same
name, the previous column
+ * will be overwritten.
+ * @param fieldName Name of the implicit column. Recommended that this
start with an underscore.
+ * @param type The Drill MinorType of the implicit column. Currently only
supports simple types.
+ */
+ public void addImplicitColumn(String fieldName, MinorType type) {
+ implicitColumns.put(fieldName, new ImplicitColumn(fieldName, type,
rowWriter));
+ }
+
+ /**
+ * Returns a requested ImplicitColumn. If the column cannot be found,
will return null.
+ * @param fieldName The field name of the desired column
+ * @return The specific column requested, null if that column does not
exist.
+ */
+ public ImplicitColumn getColumn(String fieldName) {
+ return implicitColumns.get(fieldName);
+ }
+
+ /**
+ * This function writes the data to the implicit columns. This should be
called in the next() method
+ * in a batch reader so that the columns get populated. If there are no
implicit columns, this function
+ * will do nothing, so null checks are not necessary.
+ */
+ public void writeImplicitColumns() {
+ ImplicitColumn column;
+ ScalarWriter writer;
+ MinorType dataType;
+ Object value;
+
+ for (Map.Entry<String, ImplicitColumn> columnEntry :
implicitColumns.entrySet()) {
+ column = columnEntry.getValue();
+ writer = column.writer;
+ dataType = column.dataType;
+ value = column.value;
+
+ switch (dataType) {
+ case INT:
+ writer.setInt((Integer) value);
+ break;
+ case BIGINT:
+ writer.setLong((Long) value);
+ break;
+ case FLOAT4:
+ writer.setFloat((Float) value);
+ break;
+ case FLOAT8:
+ writer.setDouble((Double) value);
+ break;
+ case VARCHAR:
+ writer.setString((String) value);
+ break;
+ case BIT:
+ writer.setBoolean((Boolean) value);
+ break;
+ default:
+ logger.warn("{} types are not implemented as implicit fields.",
dataType);
+ }
+ }
+ }
+ }
+
+ public static class ImplicitColumn {
+ private final String fieldName;
+ private final MinorType dataType;
+ private final int columnIndex;
+ private final ScalarWriter writer;
+ private Object value;
+
+ public ImplicitColumn(String fieldName, MinorType dataType, RowSetLoader
rowWriter) {
+ this.dataType = dataType;
+ this.fieldName = fieldName;
+ this.columnIndex = addImplicitColumnToSchema(this.fieldName,
this.dataType, rowWriter);
+ this.writer = rowWriter.scalar(this.columnIndex);
+ }
+
+ public ImplicitColumn(String fieldName, MinorType dataType, RowSetLoader
rowWriter, Object value) {
+ this.dataType = dataType;
+ this.fieldName = fieldName;
+ this.columnIndex = addImplicitColumnToSchema(this.fieldName,
this.dataType, rowWriter);
+ this.writer = rowWriter.scalar(this.columnIndex);
+ this.value = value;
+ }
+
+ /**
+ * Adds an implicit column to the schema. Implicit columns are by default
optional and excluded from wildcard
+ * queries. This should be used for file metadata or other metadata that
you want to be present in a query, but only if
+ * a user specifically asks for it.
+ *
+ * @param fieldName The name of the implicit column to be added. Should
start with an underscore
+ * @param type The minor type of the implicit field. Currently only
non-complex types are supported with this class
+ * @param rowWriter The RowSetLoader
+ * @return The index of the newly added column.
+ */
+ private int addImplicitColumnToSchema(String fieldName, MinorType type,
RowSetLoader rowWriter) {
+ ColumnMetadata colSchema = MetadataUtils.newScalar(fieldName, type,
DataMode.OPTIONAL);
+ colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+ return rowWriter.addColumn(colSchema);
+ }
+
+ public String getFieldName() { return fieldName; }
+
+ public MinorType getDataType() { return dataType; }
+
+ public int getColumnIndex() { return columnIndex; }
+
+ public Object getValue() { return value; }
+
+ public void setValue(Object v) { value = v; }
+
+ public ScalarWriter getWriter() { return writer; }
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
index 97cd200..0e7fd4d 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.physical.resultSet.RowSetLoader;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.server.options.OptionSet;
+import org.apache.drill.exec.store.ImplicitColumnUtils.ImplicitColumns;
import org.apache.drill.exec.store.easy.json.extended.ExtendedTypeFieldFactory;
import org.apache.drill.exec.store.easy.json.parser.ErrorFactory;
import org.apache.drill.exec.store.easy.json.parser.JsonStructureParser;
@@ -105,7 +106,7 @@ import com.fasterxml.jackson.core.JsonToken;
* boolean flags as in the prior version.</li>
* <li>Reports errors as {@link UserException} objects, complete with context
* information, rather than as generic Java exception as in the prior
version.</li>
- * <li>Moves parse options into a separate {@link JsonOptions} class.</li>
+ * <li>Moves parse options into a separate {@link JsonLoaderOptions}
class.</li>
* <li>Iteration protocol is simpler: simply call {@link #readBatch()} until
it returns
* {@code false}. Errors are reported out-of-band via an exception.</li>
* <li>The result set loader abstraction is perfectly happy with an empty
schema.
@@ -146,6 +147,8 @@ public class JsonLoaderImpl implements JsonLoader,
ErrorFactory {
private Reader reader;
private String dataPath;
private MessageParser messageParser;
+ private ImplicitColumns implicitFields;
+ private int maxRows;
public JsonLoaderBuilder resultSetLoader(ResultSetLoader rsLoader) {
this.rsLoader = rsLoader;
@@ -167,6 +170,11 @@ public class JsonLoaderImpl implements JsonLoader,
ErrorFactory {
return this;
}
+ public JsonLoaderBuilder implicitFields(ImplicitColumns metadata) {
+ this.implicitFields = metadata;
+ return this;
+ }
+
public JsonLoaderBuilder errorContext(CustomErrorContext errorContext) {
this.errorContext = errorContext;
return this;
@@ -197,6 +205,11 @@ public class JsonLoaderImpl implements JsonLoader,
ErrorFactory {
return this;
}
+ public JsonLoaderBuilder maxRows(int maxRows) {
+ this.maxRows = maxRows;
+ return this;
+ }
+
public JsonLoader build() {
// Defaults, primarily for testing.
if (options == null) {
@@ -218,6 +231,8 @@ public class JsonLoaderImpl implements JsonLoader,
ErrorFactory {
private final CustomErrorContext errorContext;
private final JsonStructureParser parser;
private final FieldFactory fieldFactory;
+ private final ImplicitColumns implicitFields;
+ private final int maxRows;
private boolean eof;
/**
@@ -235,7 +250,9 @@ public class JsonLoaderImpl implements JsonLoader,
ErrorFactory {
protected JsonLoaderImpl(JsonLoaderBuilder builder) {
this.rsLoader = builder.rsLoader;
this.options = builder.options;
- this.errorContext = builder. errorContext;
+ this.errorContext = builder.errorContext;
+ this.implicitFields = builder.implicitFields;
+ this.maxRows = builder.maxRows;
this.fieldFactory = buildFieldFactory(builder);
this.parser = buildParser(builder);
}
@@ -276,6 +293,10 @@ public class JsonLoaderImpl implements JsonLoader,
ErrorFactory {
RowSetLoader rowWriter = rsLoader.writer();
while (rowWriter.start()) {
if (parser.next()) {
+ // Add implicit fields
+ if (implicitFields != null) {
+ implicitFields.writeImplicitColumns();
+ }
rowWriter.save();
} else {
eof = true;