luocooong commented on a change in pull request #2256:
URL: https://github.com/apache/drill/pull/2256#discussion_r649306614
##########
File path:
contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpXMLBatchReader.java
##########
@@ -80,8 +81,14 @@ public void addContext(UserException.Builder builder) {
try {
xmlReader = new XMLReader(inStream, dataLevel, maxRecords);
ResultSetLoader resultLoader = negotiator.build();
+
+ implicitColumns = new ImplicitColumns(resultLoader.writer());
+ buildImplicitColumns();
+ populateImplicitFieldMap(http);
+
RowSetLoader rootRowWriter = resultLoader.writer();
xmlReader.open(rootRowWriter, errorContext);
+ xmlReader.setMetadata(implicitColumns);
Review comment:
The `setMetadata(implicitColumns)` is equal to
`JsonLoaderBuilder().implicitFields(implicitColumns)`?
##########
File path:
contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
##########
@@ -24,26 +24,34 @@
import org.apache.drill.common.exceptions.ChildErrorContext;
import org.apache.drill.common.exceptions.CustomErrorContext;
import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.physical.impl.scan.framework.SchemaNegotiator;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
import org.apache.drill.exec.store.easy.json.loader.JsonLoader;
import
org.apache.drill.exec.store.easy.json.loader.JsonLoaderImpl.JsonLoaderBuilder;
import org.apache.drill.exec.store.http.util.HttpProxyConfig;
import org.apache.drill.exec.store.http.util.HttpProxyConfig.ProxyBuilder;
import org.apache.drill.exec.store.http.util.SimpleHttp;
import org.apache.drill.exec.store.security.UsernamePasswordCredentials;
+import org.apache.drill.exec.store.ImplicitColumnUtils.ImplicitColumns;
import java.io.File;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
public class HttpBatchReader implements ManagedReader<SchemaNegotiator> {
+
+ private static final String[] STRING_METADATA_FIELDS = {"_response_message",
"_response_protocol", "_response_url"};
Review comment:
Keep a white space between the `{` and `"`.
```json
{ "_response_message", "_response_protocol", "_response_url" }
```
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/loader/JsonLoaderImpl.java
##########
@@ -276,6 +285,10 @@ public boolean readBatch() {
RowSetLoader rowWriter = rsLoader.writer();
while (rowWriter.start()) {
if (parser.next()) {
+ // Add implicit fields
+ if (implicitFields != null) {
Review comment:
I think the `implicitColumns` may be empty (Not null). So, I recommend
that
```java
if(!implicitColumns.isEmpty()) {
// do that
}
```
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnUtils.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ImplicitColumnUtils {
+ private static final Logger logger =
LoggerFactory.getLogger(ImplicitColumnUtils.class);
+
+ public static class ImplicitColumns {
+ private final Map<String, ImplicitColumn> implicitColumns;
+ private final RowSetLoader rowWriter;
+
+ public ImplicitColumns (RowSetLoader rowWriter) {
+ this.implicitColumns = new HashMap<>();
+ this.rowWriter = rowWriter;
+ }
+
+ public void addImplicitColumn(String fieldName, MinorType type) {
+ implicitColumns.put(fieldName, new ImplicitColumn(fieldName, type,
rowWriter));
+ }
+
+ public ImplicitColumn getColumn(String fieldName) {
+ return implicitColumns.get(fieldName);
+ }
+
+ public void writeImplicitColumns() {
+ ImplicitColumn column;
+ ScalarWriter writer;
+ MinorType dataType;
+ Object value;
+
+ for (Map.Entry<String, ImplicitColumn> columnEntry :
implicitColumns.entrySet()) {
+ column = columnEntry.getValue();
+ writer = column.writer;
+ dataType = column.dataType;
+ value = column.value;
+
+ switch (dataType) {
+ case INT:
+ writer.setInt((Integer) value);
+ break;
+ case BIGINT:
+ writer.setLong((Long) value);
+ break;
+ case FLOAT4:
+ writer.setFloat((Float) value);
+ break;
+ case FLOAT8:
+ writer.setDouble((Double) value);
+ break;
+ case VARCHAR:
+ writer.setString((String) value);
+ break;
+ case BIT:
+ writer.setBoolean((Boolean) value);
+ break;
+ default:
+ logger.warn("{} types are not implemented as implicit fields.",
dataType);
+ }
+ }
+ }
+ }
+
+ public static class ImplicitColumn {
+ private final String fieldName;
+ private final MinorType dataType;
+ private final int columnIndex;
+ private final ScalarWriter writer;
+ private Object value;
+
+ public ImplicitColumn(String fieldName, MinorType dataType, RowSetLoader
rowWriter) {
+ this.dataType = dataType;
+ this.fieldName = fieldName;
+ this.columnIndex = addImplicitColumnToSchema(this.fieldName,
this.dataType, rowWriter);
+ this.writer = rowWriter.scalar(this.columnIndex);
+ }
+
+ public ImplicitColumn(String fieldName, MinorType dataType, RowSetLoader
rowWriter, Object value) {
+ this.dataType = dataType;
+ this.fieldName = fieldName;
+ this.columnIndex = addImplicitColumnToSchema(this.fieldName,
this.dataType, rowWriter);
+ this.writer = rowWriter.scalar(this.columnIndex);
Review comment:
Should we check that the index already exists?
```java
index = rowWriter.tupleSchema().index(fieldName);
if (index == -1) {
index = addImplicitColumnToSchema(this.fieldName, this.dataType,
rowWriter);
}
```
##########
File path:
contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
##########
@@ -77,10 +85,17 @@ public void addContext(UserException.Builder builder) {
errorContext);
// JSON loader setup
+ ResultSetLoader loader = negotiator.build();
+ implicitColumns = new ImplicitColumns(loader.writer());
+ buildImplicitColumns();
+
InputStream inStream = http.getInputStream();
+ populateImplicitFieldMap(http);
+
try {
jsonLoader = new JsonLoaderBuilder()
- .resultSetLoader(negotiator.build())
+ .implicitFields(implicitColumns)
Review comment:
Nice design. Is it possible to provide a reference for new plugins?
##########
File path:
contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
##########
@@ -171,6 +175,12 @@ public InputStream getInputStream() {
.newCall(request)
.execute();
+ // Preserve the response
+ responseMessage = response.message();
+ responseCode = response.code();
+ responseProtocol = response.protocol().toString();
Review comment:
Can we make sure that `okhttp3.Protocol` and `okhttp3.HttpUrl` will not
be null?
##########
File path:
contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
##########
@@ -171,6 +175,12 @@ public InputStream getInputStream() {
.newCall(request)
.execute();
+ // Preserve the response
+ responseMessage = response.message();
+ responseCode = response.code();
+ responseProtocol = response.protocol().toString();
+ responseURL = response.request().url().toString();
Review comment:
Can we make sure that `okhttp3.Protocol` and `okhttp3.HttpUrl` will not
be null? Need to avoid the NPE.
##########
File path:
contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/util/SimpleHttp.java
##########
@@ -171,6 +175,12 @@ public InputStream getInputStream() {
.newCall(request)
.execute();
+ // Preserve the response
+ responseMessage = response.message();
+ responseCode = response.code();
+ responseProtocol = response.protocol().toString();
+ responseURL = response.request().url().toString();
Review comment:
Can we make sure that `okhttp3.Protocol` and `okhttp3.HttpUrl` will not
be null? Avoid the NPE.
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnUtils.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ImplicitColumnUtils {
+ private static final Logger logger =
LoggerFactory.getLogger(ImplicitColumnUtils.class);
+
+ public static class ImplicitColumns {
+ private final Map<String, ImplicitColumn> implicitColumns;
+ private final RowSetLoader rowWriter;
+
+ public ImplicitColumns (RowSetLoader rowWriter) {
Review comment:
Remove the white space after the `ImplicitColumns`.
##########
File path:
contrib/storage-http/src/main/java/org/apache/drill/exec/store/http/HttpBatchReader.java
##########
@@ -97,6 +112,21 @@ public void addContext(UserException.Builder builder) {
return true; // Please read the first batch
}
+ protected void buildImplicitColumns() {
+ // Add String fields
+ for (String fieldName : STRING_METADATA_FIELDS) {
+ implicitColumns.addImplicitColumn(fieldName, MinorType.VARCHAR);
+ }
+ implicitColumns.addImplicitColumn(RESPONSE_CODE_FIELD, MinorType.INT);
+ }
+
+ protected void populateImplicitFieldMap(SimpleHttp http) {
+
implicitColumns.getColumn("_response_message").setValue(http.getResponseMessage());
Review comment:
Is it possible to use the final variable? eg :
`getColumn(STRING_METADATA_FIELDS[0])`
##########
File path:
exec/java-exec/src/main/java/org/apache/drill/exec/store/ImplicitColumnUtils.java
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store;
+
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class ImplicitColumnUtils {
+ private static final Logger logger =
LoggerFactory.getLogger(ImplicitColumnUtils.class);
+
+ public static class ImplicitColumns {
+ private final Map<String, ImplicitColumn> implicitColumns;
+ private final RowSetLoader rowWriter;
+
+ public ImplicitColumns (RowSetLoader rowWriter) {
+ this.implicitColumns = new HashMap<>();
+ this.rowWriter = rowWriter;
+ }
+
+ public void addImplicitColumn(String fieldName, MinorType type) {
+ implicitColumns.put(fieldName, new ImplicitColumn(fieldName, type,
rowWriter));
+ }
+
+ public ImplicitColumn getColumn(String fieldName) {
+ return implicitColumns.get(fieldName);
+ }
+
+ public void writeImplicitColumns() {
+ ImplicitColumn column;
+ ScalarWriter writer;
+ MinorType dataType;
+ Object value;
+
+ for (Map.Entry<String, ImplicitColumn> columnEntry :
implicitColumns.entrySet()) {
+ column = columnEntry.getValue();
+ writer = column.writer;
+ dataType = column.dataType;
+ value = column.value;
+
+ switch (dataType) {
+ case INT:
+ writer.setInt((Integer) value);
+ break;
+ case BIGINT:
+ writer.setLong((Long) value);
+ break;
+ case FLOAT4:
+ writer.setFloat((Float) value);
+ break;
+ case FLOAT8:
+ writer.setDouble((Double) value);
+ break;
+ case VARCHAR:
+ writer.setString((String) value);
+ break;
+ case BIT:
+ writer.setBoolean((Boolean) value);
+ break;
+ default:
+ logger.warn("{} types are not implemented as implicit fields.",
dataType);
+ }
+ }
+ }
+ }
+
+ public static class ImplicitColumn {
+ private final String fieldName;
+ private final MinorType dataType;
+ private final int columnIndex;
+ private final ScalarWriter writer;
+ private Object value;
+
+ public ImplicitColumn(String fieldName, MinorType dataType, RowSetLoader
rowWriter) {
+ this.dataType = dataType;
+ this.fieldName = fieldName;
+ this.columnIndex = addImplicitColumnToSchema(this.fieldName,
this.dataType, rowWriter);
+ this.writer = rowWriter.scalar(this.columnIndex);
+ }
+
+ public ImplicitColumn(String fieldName, MinorType dataType, RowSetLoader
rowWriter, Object value) {
+ this.dataType = dataType;
+ this.fieldName = fieldName;
+ this.columnIndex = addImplicitColumnToSchema(this.fieldName,
this.dataType, rowWriter);
+ this.writer = rowWriter.scalar(this.columnIndex);
+ this.value = value;
+ }
+
+ /**
+ * Adds an implicit column to the schema. Implicit columns are by default
optional and excluded from wildcard
+ * queries. This should be used for file metadata or other metadata that
you want to be present in a query, but only if
+ * a user specifically asks for it.
+ *
+ * @param fieldName The name of the implicit column to be added. Should
start with an underscore
+ * @param type The minor type of the implicit field. Currently only
non-complex types are supported with this class
+ * @param rowWriter The RowSetLoader
+ * @return The index of the newly added column.
+ */
+ private int addImplicitColumnToSchema(String fieldName, MinorType type,
RowSetLoader rowWriter) {
+ ColumnMetadata colSchema = MetadataUtils.newScalar(fieldName, type,
DataMode.OPTIONAL);
+ colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
Review comment:
I am interested in the usage of
`setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true)`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]