This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 74b277d DRILL-7313: Use Hive schema for MaprDB native reader when
field was empty
74b277d is described below
commit 74b277d2779064e62cf589f1dfaeb25f9bb4527f
Author: Volodymyr Vysotskyi <[email protected]>
AuthorDate: Tue Jul 2 17:25:59 2019 +0300
DRILL-7313: Use Hive schema for MaprDB native reader when field was empty
- Added all_text_mode option for hive maprDB Json
- Improved logic to convert Hive's schema into Drill's one
- Added unit tests for schema conversion
---
.../exec/store/mapr/db/MapRDBFormatMatcher.java | 3 +-
.../exec/store/mapr/db/MapRDBFormatPlugin.java | 14 +-
.../drill/exec/store/mapr/db/MapRDBGroupScan.java | 53 ++++--
.../store/mapr/db/MapRDBPushFilterIntoScan.java | 12 +-
.../store/mapr/db/MapRDBPushLimitIntoScan.java | 2 +-
.../mapr/db/MapRDBRestrictedScanBatchCreator.java | 4 +-
.../exec/store/mapr/db/MapRDBScanBatchCreator.java | 4 +-
.../drill/exec/store/mapr/db/MapRDBSubScan.java | 31 ++--
.../store/mapr/db/RestrictedMapRDBSubScan.java | 13 +-
.../store/mapr/db/binary/BinaryTableGroupScan.java | 35 ++--
.../store/mapr/db/json/JsonTableGroupScan.java | 49 +++---
.../store/mapr/db/json/MaprDBJsonRecordReader.java | 11 +-
.../mapr/db/json/RestrictedJsonRecordReader.java | 6 +-
.../mapr/db/json/RestrictedJsonTableGroupScan.java | 20 ++-
...ertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java | 38 +++--
.../planner/types/HiveToRelDataTypeConverter.java | 29 ++--
.../drill/exec/store/hive/HiveUtilities.java | 100 ++++++++++++
.../store/hive/schema/TestSchemaConversion.java | 178 +++++++++++++++++++++
.../java/org/apache/drill/exec/ExecConstants.java | 7 +-
.../drill/exec/planner/sql/TypeInferenceUtils.java | 33 ++++
.../exec/server/options/SystemOptionManager.java | 1 +
.../drill/exec/store/dfs/easy/EasyGroupScan.java | 5 +-
.../exec/vector/complex/fn/JsonReaderUtils.java | 59 +++++--
.../FileSystemMetadataProviderManager.java | 31 ++++
.../java-exec/src/main/resources/drill-module.conf | 1 +
.../src/main/codegen/templates/ComplexCopier.java | 15 +-
.../exec/record/metadata/RepeatedListBuilder.java | 30 ++++
.../drill/metastore/util/SchemaPathUtils.java | 2 +
28 files changed, 645 insertions(+), 141 deletions(-)
diff --git
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
index f982278..73c5ffb 100644
---
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
+++
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatMatcher.java
@@ -86,7 +86,8 @@ public class MapRDBFormatMatcher extends TableFormatMatcher {
dt.setGroupScan(fp.getGroupScan(userName,
selection,
null /* columns */,
- (IndexDesc) ((MapRDBIndexDescriptor)
secondaryIndexDesc).getOriginalDesc()));
+ (IndexDesc) ((MapRDBIndexDescriptor)
secondaryIndexDesc).getOriginalDesc(),
+ null /* metadataProviderManager */));
return dt;
}
diff --git
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
index 6501f8c..a401094 100644
---
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
+++
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.store.mapr.TableFormatPlugin;
import org.apache.drill.exec.store.mapr.db.binary.BinaryTableGroupScan;
import org.apache.drill.exec.store.mapr.db.json.JsonScanSpec;
import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
+import org.apache.drill.metastore.MetadataProviderManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -46,9 +47,11 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import com.mapr.db.index.IndexDesc;
import com.mapr.fs.tables.TableProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class MapRDBFormatPlugin extends TableFormatPlugin {
- static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(MapRDBFormatPlugin.class);
+ private static final Logger logger =
LoggerFactory.getLogger(MapRDBFormatPlugin.class);
private final MapRDBFormatMatcher matcher;
private final Configuration hbaseConf;
@@ -115,25 +118,24 @@ public class MapRDBFormatPlugin extends TableFormatPlugin
{
MapRDBPushLimitIntoScan.LIMIT_ON_SCAN,
MapRDBPushLimitIntoScan.LIMIT_ON_RKJOIN);
}
-
public AbstractGroupScan getGroupScan(String userName, FileSelection
selection,
- List<SchemaPath> columns, IndexDesc indexDesc) throws IOException {
+ List<SchemaPath> columns, IndexDesc indexDesc, MetadataProviderManager
metadataProviderManager) throws IOException {
String tableName = getTableName(selection);
TableProperties props = getMaprFS().getTableProperties(new
Path(tableName));
if (props.getAttr().getJson()) {
JsonScanSpec scanSpec = new JsonScanSpec(tableName, indexDesc,
null/*condition*/);
- return new JsonTableGroupScan(userName, getStoragePlugin(), this,
scanSpec, columns);
+ return new JsonTableGroupScan(userName, getStoragePlugin(), this,
scanSpec, columns, metadataProviderManager);
} else {
HBaseScanSpec scanSpec = new HBaseScanSpec(tableName);
- return new BinaryTableGroupScan(userName, getStoragePlugin(), this,
scanSpec, columns);
+ return new BinaryTableGroupScan(userName, getStoragePlugin(), this,
scanSpec, columns, metadataProviderManager);
}
}
@Override
public AbstractGroupScan getGroupScan(String userName, FileSelection
selection,
List<SchemaPath> columns) throws IOException {
- return getGroupScan(userName, selection, columns, (IndexDesc) null /*
indexDesc */);
+ return getGroupScan(userName, selection, columns, (IndexDesc) null /*
indexDesc */, null /* metadataProviderManager */);
}
public boolean supportsStatistics() {
diff --git
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
index 07943d9..1f0b626 100644
---
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
+++
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBGroupScan.java
@@ -43,10 +43,13 @@ import org.apache.drill.exec.planner.index.IndexDiscover;
import org.apache.drill.exec.planner.index.IndexDiscoverFactory;
import org.apache.drill.exec.planner.index.MapRDBIndexDiscover;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.drill.metastore.metadata.TableMetadata;
+import org.apache.drill.metastore.metadata.TableMetadataProvider;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -56,6 +59,9 @@ import
org.apache.drill.shaded.guava.com.google.common.collect.Sets;
public abstract class MapRDBGroupScan extends AbstractDbGroupScan {
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(MapRDBGroupScan.class);
+ private static final Comparator<List<MapRDBSubScanSpec>>
LIST_SIZE_COMPARATOR = Comparator.comparingInt(List::size);
+ private static final Comparator<List<MapRDBSubScanSpec>>
LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
+
protected AbstractStoragePlugin storagePlugin;
protected MapRDBFormatPlugin formatPlugin;
@@ -74,14 +80,9 @@ public abstract class MapRDBGroupScan extends
AbstractDbGroupScan {
private Stopwatch watch = Stopwatch.createUnstarted();
- private static final Comparator<List<MapRDBSubScanSpec>>
LIST_SIZE_COMPARATOR = new Comparator<List<MapRDBSubScanSpec>>() {
- @Override
- public int compare(List<MapRDBSubScanSpec> list1, List<MapRDBSubScanSpec>
list2) {
- return list1.size() - list2.size();
- }
- };
+ private TableMetadataProvider metadataProvider;
- private static final Comparator<List<MapRDBSubScanSpec>>
LIST_SIZE_COMPARATOR_REV = Collections.reverseOrder(LIST_SIZE_COMPARATOR);
+ private TableMetadata tableMetadata;
public MapRDBGroupScan(MapRDBGroupScan that) {
super(that);
@@ -96,27 +97,30 @@ public abstract class MapRDBGroupScan extends
AbstractDbGroupScan {
* during the copy-constructor
*/
this.doNotAccessRegionsToScan = that.doNotAccessRegionsToScan;
+ this.metadataProvider = that.metadataProvider;
}
public MapRDBGroupScan(AbstractStoragePlugin storagePlugin,
- MapRDBFormatPlugin formatPlugin, List<SchemaPath> columns, String
userName) {
+ MapRDBFormatPlugin formatPlugin, List<SchemaPath> columns, String
userName,
+ TableMetadataProvider metadataProvider) {
super(userName);
this.storagePlugin = storagePlugin;
this.formatPlugin = formatPlugin;
- this.formatPluginConfig =
(MapRDBFormatPluginConfig)formatPlugin.getConfig();
+ this.formatPluginConfig = formatPlugin.getConfig();
this.columns = columns;
+ this.metadataProvider = metadataProvider;
}
@Override
public List<EndpointAffinity> getOperatorAffinity() {
watch.reset();
watch.start();
- Map<String, DrillbitEndpoint> endpointMap = new HashMap<String,
DrillbitEndpoint>();
+ Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
for (DrillbitEndpoint ep : formatPlugin.getContext().getBits()) {
endpointMap.put(ep.getAddress(), ep);
}
- final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new
HashMap<DrillbitEndpoint, EndpointAffinity>();
+ final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new
HashMap<>();
for (String serverName : getRegionsToScan().values()) {
DrillbitEndpoint ep = endpointMap.get(serverName);
if (ep != null) {
@@ -230,9 +234,9 @@ public abstract class MapRDBGroupScan extends
AbstractDbGroupScan {
* While there are slots with lesser than 'minPerEndpointSlot' unit work,
balance from those with more.
*/
while(minHeap.peek() != null && minHeap.peek().size() <
minPerEndpointSlot) {
- List<MapRDBSubScanSpec> smallestList = (List<MapRDBSubScanSpec>)
minHeap.poll();
- List<MapRDBSubScanSpec> largestList = (List<MapRDBSubScanSpec>)
maxHeap.poll();
- smallestList.add(largestList.remove(largestList.size()-1));
+ List<MapRDBSubScanSpec> smallestList = minHeap.poll();
+ List<MapRDBSubScanSpec> largestList = maxHeap.poll();
+ smallestList.add(largestList.remove(largestList.size() - 1));
if (largestList.size() > minPerEndpointSlot) {
maxHeap.offer(largestList);
}
@@ -344,4 +348,25 @@ public abstract class MapRDBGroupScan extends
AbstractDbGroupScan {
public PluginCost getPluginCostModel() {
return formatPlugin.getPluginCostModel();
}
+
+ @JsonProperty
+ public TupleMetadata getSchema() {
+ return getTableMetadata().getSchema();
+ }
+
+ @Override
+ @JsonIgnore
+ public TableMetadataProvider getMetadataProvider() {
+ return metadataProvider;
+ }
+
+ @Override
+ @JsonIgnore
+ public TableMetadata getTableMetadata() {
+ if (tableMetadata == null) {
+ tableMetadata = metadataProvider.getTableMetadata();
+ }
+ return tableMetadata;
+ }
+
}
diff --git
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
index 6bd3368..d5afbdb 100644
---
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
+++
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushFilterIntoScan.java
@@ -41,7 +41,6 @@ import
org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
public abstract class MapRDBPushFilterIntoScan extends
StoragePluginOptimizerRule {
- static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(MapRDBPushFilterIntoScan.class);
private MapRDBPushFilterIntoScan(RelOptRuleOperand operand, String
description) {
super(operand, description);
@@ -100,7 +99,7 @@ public abstract class MapRDBPushFilterIntoScan extends
StoragePluginOptimizerRul
@Override
public boolean matches(RelOptRuleCall call) {
- final ScanPrel scan = (ScanPrel) call.rel(2);
+ final ScanPrel scan = call.rel(2);
if (scan.getGroupScan() instanceof BinaryTableGroupScan ||
scan.getGroupScan() instanceof JsonTableGroupScan) {
return super.matches(call);
@@ -124,7 +123,7 @@ public abstract class MapRDBPushFilterIntoScan extends
StoragePluginOptimizerRul
return;
}
- LogicalExpression conditionExp = null;
+ LogicalExpression conditionExp;
try {
conditionExp = DrillOptiq.toDrill(new
DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan,
condition);
} catch (ClassCastException e) {
@@ -182,9 +181,10 @@ public abstract class MapRDBPushFilterIntoScan extends
StoragePluginOptimizerRul
}
// Pass tableStats from old groupScan so we do not go and fetch stats (an
expensive operation) again from MapR DB client.
- final BinaryTableGroupScan newGroupsScan = new
BinaryTableGroupScan(groupScan.getUserName(), groupScan.getStoragePlugin(),
-
groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns(),
-
groupScan.getTableStats());
+ final BinaryTableGroupScan newGroupsScan =
+ new BinaryTableGroupScan(groupScan.getUserName(),
groupScan.getStoragePlugin(),
+ groupScan.getFormatPlugin(), newScanSpec, groupScan.getColumns(),
+ groupScan.getTableStats(), groupScan.getMetadataProvider());
newGroupsScan.setFilterPushedDown(true);
final ScanPrel newScanPrel = new ScanPrel(scan.getCluster(),
filter.getTraitSet(), newGroupsScan, scan.getRowType(), scan.getTable());
diff --git
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java
index 28d59d0..325277b 100644
---
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java
+++
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBPushLimitIntoScan.java
@@ -171,7 +171,7 @@ public abstract class MapRDBPushLimitIntoScan extends
StoragePluginOptimizerRule
oldScanSpec.getStopRow(), oldScanSpec.getFilter());
return new BinaryTableGroupScan(binaryTableGroupScan.getUserName(),
binaryTableGroupScan.getStoragePlugin(),
binaryTableGroupScan.getFormatPlugin(), newScanSpec,
binaryTableGroupScan.getColumns(),
- binaryTableGroupScan.getTableStats()).applyLimit(offset + fetch);
+ binaryTableGroupScan.getTableStats(),
binaryTableGroupScan.getMetadataProvider()).applyLimit(offset + fetch);
}
return null;
}
diff --git
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java
index cb3732a..bdea6e8 100644
---
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java
+++
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBRestrictedScanBatchCreator.java
@@ -38,8 +38,8 @@ public class MapRDBRestrictedScanBatchCreator implements
BatchCreator<Restricted
List<RecordReader> readers = Lists.newArrayList();
for(MapRDBSubScanSpec scanSpec : subScan.getRegionScanSpecList()){
try {
- readers.add(new
RestrictedJsonRecordReader((RestrictedMapRDBSubScanSpec)scanSpec,
subScan.getFormatPlugin(), subScan.getColumns(),
- context, subScan.getMaxRecordsToRead()));
+ readers.add(new RestrictedJsonRecordReader(scanSpec,
subScan.getFormatPlugin(), subScan.getColumns(),
+ context, subScan.getMaxRecordsToRead(), subScan.getSchema()));
} catch (Exception e1) {
throw new ExecutionSetupException(e1);
}
diff --git
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
index 2f53398..6fe467f 100644
---
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
+++
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBScanBatchCreator.java
@@ -34,7 +34,6 @@ import
org.apache.drill.exec.store.mapr.db.json.MaprDBJsonRecordReader;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
public class MapRDBScanBatchCreator implements BatchCreator<MapRDBSubScan> {
- private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(MapRDBScanBatchCreator.class);
@Override
public ScanBatch getBatch(ExecutorFragmentContext context, MapRDBSubScan
subScan, List<RecordBatch> children) throws ExecutionSetupException {
@@ -48,7 +47,8 @@ public class MapRDBScanBatchCreator implements
BatchCreator<MapRDBSubScan> {
getHBaseSubScanSpec(scanSpec),
subScan.getColumns()));
} else {
- readers.add(new MaprDBJsonRecordReader(scanSpec,
subScan.getFormatPlugin(), subScan.getColumns(), context,
subScan.getMaxRecordsToRead()));
+ readers.add(new MaprDBJsonRecordReader(scanSpec,
subScan.getFormatPlugin(),
+ subScan.getColumns(), context, subScan.getMaxRecordsToRead(),
subScan.getSchema()));
}
} catch (Exception e) {
throw new ExecutionSetupException(e);
diff --git
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
index 220b90e..9b41243 100644
---
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
+++
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBSubScan.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.mapr.db;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -28,6 +29,8 @@ import org.apache.drill.exec.physical.base.AbstractDbSubScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.exec.store.StoragePluginRegistry;
import com.fasterxml.jackson.annotation.JacksonInject;
@@ -35,19 +38,18 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
// Class containing information for reading a single HBase region
@JsonTypeName("maprdb-sub-scan")
public class MapRDBSubScan extends AbstractDbSubScan {
- static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(MapRDBSubScan.class);
private final MapRDBFormatPlugin formatPlugin;
private final List<MapRDBSubScanSpec> regionScanSpecList;
private final List<SchemaPath> columns;
private final int maxRecordsToRead;
private final String tableType;
+ private final TupleMetadata schema;
@JsonCreator
public MapRDBSubScan(@JacksonInject StoragePluginRegistry engineRegistry,
@@ -57,34 +59,38 @@ public class MapRDBSubScan extends AbstractDbSubScan {
@JsonProperty("regionScanSpecList")
List<MapRDBSubScanSpec> regionScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("maxRecordsToRead") int maxRecordsToRead,
- @JsonProperty("tableType") String tableType) throws
ExecutionSetupException {
+ @JsonProperty("tableType") String tableType,
+ // TODO: DRILL-7314 - replace TupleSchema with
TupleMetadata
+ @JsonProperty("schema") TupleSchema schema) throws
ExecutionSetupException {
this(userName,
(MapRDBFormatPlugin) engineRegistry.getFormatPlugin(storageConfig,
formatPluginConfig),
regionScanSpecList,
columns,
maxRecordsToRead,
- tableType);
+ tableType,
+ schema);
}
public MapRDBSubScan(String userName, MapRDBFormatPlugin formatPlugin,
- List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns,
String tableType) {
- this(userName, formatPlugin, maprSubScanSpecs, columns, -1, tableType);
+ List<MapRDBSubScanSpec> maprSubScanSpecs, List<SchemaPath> columns,
String tableType, TupleMetadata schema) {
+ this(userName, formatPlugin, maprSubScanSpecs, columns, -1, tableType,
schema);
}
public MapRDBSubScan(String userName, MapRDBFormatPlugin formatPlugin,
- List<MapRDBSubScanSpec> maprSubScanSpecs,
List<SchemaPath> columns, int maxRecordsToRead, String tableType) {
+ List<MapRDBSubScanSpec> maprSubScanSpecs,
List<SchemaPath> columns, int maxRecordsToRead, String tableType, TupleMetadata
schema) {
super(userName);
this.formatPlugin = formatPlugin;
this.regionScanSpecList = maprSubScanSpecs;
this.columns = columns;
this.maxRecordsToRead = maxRecordsToRead;
this.tableType = tableType;
+ this.schema = schema;
}
@JsonProperty("formatPluginConfig")
public MapRDBFormatPluginConfig getFormatPluginConfig() {
- return (MapRDBFormatPluginConfig) formatPlugin.getConfig();
+ return formatPlugin.getConfig();
}
@JsonProperty("storageConfig")
@@ -112,6 +118,11 @@ public class MapRDBSubScan extends AbstractDbSubScan {
return tableType;
}
+ @JsonProperty("schema")
+ public TupleMetadata getSchema() {
+ return schema;
+ }
+
@Override
public boolean isExecutable() {
return false;
@@ -125,12 +136,12 @@ public class MapRDBSubScan extends AbstractDbSubScan {
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.isEmpty());
- return new MapRDBSubScan(getUserName(), formatPlugin, regionScanSpecList,
columns, tableType);
+ return new MapRDBSubScan(getUserName(), formatPlugin, regionScanSpecList,
columns, tableType, schema);
}
@Override
public Iterator<PhysicalOperator> iterator() {
- return ImmutableSet.<PhysicalOperator>of().iterator();
+ return Collections.emptyIterator();
}
@Override
diff --git
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java
index eedbca5..8b696c7 100644
---
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java
+++
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/RestrictedMapRDBSubScan.java
@@ -29,6 +29,8 @@ import
org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.impl.join.RowKeyJoin;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.exec.store.StoragePluginRegistry;
/**
@@ -47,15 +49,18 @@ public class RestrictedMapRDBSubScan extends MapRDBSubScan {
@JsonProperty("regionScanSpecList")
List<RestrictedMapRDBSubScanSpec> regionScanSpecList,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("maxRecordsToRead") int maxRecordsToRead,
- @JsonProperty("tableType") String tableType) throws
ExecutionSetupException {
+ @JsonProperty("tableType") String tableType,
+ // TODO: DRILL-7314 - replace TupleSchema with
TupleMetadata
+ @JsonProperty("schema") TupleSchema schema) throws
ExecutionSetupException {
this(userName,
(MapRDBFormatPlugin) engineRegistry.getFormatPlugin(storageConfig,
formatPluginConfig),
- regionScanSpecList, columns, maxRecordsToRead, tableType);
+ regionScanSpecList, columns, maxRecordsToRead, tableType, schema);
}
public RestrictedMapRDBSubScan(String userName, MapRDBFormatPlugin
formatPlugin,
- List<RestrictedMapRDBSubScanSpec> maprDbSubScanSpecs, List<SchemaPath>
columns, int maxRecordsToRead, String tableType) {
- super(userName, formatPlugin, new ArrayList<MapRDBSubScanSpec>(), columns,
maxRecordsToRead, tableType);
+ List<RestrictedMapRDBSubScanSpec> maprDbSubScanSpecs,
+ List<SchemaPath> columns, int maxRecordsToRead, String tableType,
TupleMetadata schema) {
+ super(userName, formatPlugin, new ArrayList<>(), columns,
maxRecordsToRead, tableType, schema);
for(RestrictedMapRDBSubScanSpec restrictedSpec : maprDbSubScanSpecs) {
getRegionScanSpecList().add(restrictedSpec);
diff --git
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
index a135464..96f96ca 100644
---
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
+++
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/binary/BinaryTableGroupScan.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.planner.index.Statistics;
import org.apache.drill.exec.store.StoragePluginRegistry;
@@ -46,6 +47,9 @@ import org.apache.drill.exec.store.mapr.db.MapRDBSubScan;
import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
import org.apache.drill.exec.store.mapr.db.MapRDBTableStats;
import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo;
+import org.apache.drill.metastore.FileSystemMetadataProviderManager;
+import org.apache.drill.metastore.MetadataProviderManager;
+import org.apache.drill.metastore.metadata.TableMetadataProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
@@ -60,10 +64,12 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@JsonTypeName("maprdb-binary-scan")
public class BinaryTableGroupScan extends MapRDBGroupScan implements
DrillHBaseConstants {
- static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(BinaryTableGroupScan.class);
+ private static final Logger logger =
LoggerFactory.getLogger(BinaryTableGroupScan.class);
public static final String TABLE_BINARY = "binary";
@@ -79,24 +85,25 @@ public class BinaryTableGroupScan extends MapRDBGroupScan
implements DrillHBaseC
@JsonProperty("storage") FileSystemConfig
storagePluginConfig,
@JsonProperty("format") MapRDBFormatPluginConfig
formatPluginConfig,
@JsonProperty("columns") List<SchemaPath>
columns,
- @JacksonInject StoragePluginRegistry
pluginRegistry) throws IOException, ExecutionSetupException {
- this (userName,
- (AbstractStoragePlugin)
pluginRegistry.getPlugin(storagePluginConfig),
- (MapRDBFormatPlugin)
pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
- scanSpec, columns);
+ // TODO: DRILL-7314 - replace TupleSchema with
TupleMetadata
+ @JsonProperty("schema") TupleSchema schema,
+ @JacksonInject StoragePluginRegistry
pluginRegistry) throws ExecutionSetupException, IOException {
+ this(userName, (AbstractStoragePlugin)
pluginRegistry.getPlugin(storagePluginConfig),
+ (MapRDBFormatPlugin)
pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
+ scanSpec, columns, null /* tableStats */,
FileSystemMetadataProviderManager.getMetadataProviderForSchema(schema));
}
public BinaryTableGroupScan(String userName, AbstractStoragePlugin
storagePlugin,
- MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec,
List<SchemaPath> columns) {
- super(storagePlugin, formatPlugin, columns, userName);
- this.hbaseScanSpec = scanSpec;
- init();
+ MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec,
List<SchemaPath> columns,
+ MetadataProviderManager metadataProviderManager) throws IOException {
+ this(userName, storagePlugin, formatPlugin, scanSpec,
+ columns, null /* tableStats */,
FileSystemMetadataProviderManager.getMetadataProvider(metadataProviderManager));
}
public BinaryTableGroupScan(String userName, AbstractStoragePlugin
storagePlugin,
- MapRDBFormatPlugin formatPlugin, HBaseScanSpec
scanSpec,
- List<SchemaPath> columns, MapRDBTableStats
tableStats) {
- super(storagePlugin, formatPlugin, columns, userName);
+ MapRDBFormatPlugin formatPlugin, HBaseScanSpec scanSpec,
+ List<SchemaPath> columns, MapRDBTableStats tableStats,
TableMetadataProvider metadataProvider) {
+ super(storagePlugin, formatPlugin, columns, userName, metadataProvider);
this.hbaseScanSpec = scanSpec;
this.tableStats = tableStats;
init();
@@ -173,7 +180,7 @@ public class BinaryTableGroupScan extends MapRDBGroupScan
implements DrillHBaseC
assert minorFragmentId < endpointFragmentMapping.size() : String.format(
"Mappings length [%d] should be greater than minor fragment id [%d]
but it isn't.", endpointFragmentMapping.size(),
minorFragmentId);
- return new MapRDBSubScan(getUserName(), formatPlugin,
endpointFragmentMapping.get(minorFragmentId), columns, TABLE_BINARY);
+ return new MapRDBSubScan(getUserName(), formatPlugin,
endpointFragmentMapping.get(minorFragmentId), columns, TABLE_BINARY,
getSchema());
}
@Override
diff --git
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
index 24f370a..414c7f3 100644
---
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
+++
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/JsonTableGroupScan.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.physical.base.IndexGroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty;
+import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.planner.index.IndexDescriptor;
import org.apache.drill.exec.planner.index.MapRDBIndexDescriptor;
@@ -64,6 +65,9 @@ import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
import org.apache.drill.exec.store.mapr.db.MapRDBTableStats;
import org.apache.drill.exec.store.mapr.db.TabletFragmentInfo;
import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.metastore.FileSystemMetadataProviderManager;
+import org.apache.drill.metastore.MetadataProviderManager;
+import org.apache.drill.metastore.metadata.TableMetadataProvider;
import org.codehaus.jackson.annotate.JsonCreator;
import org.ojai.store.QueryCondition;
@@ -119,26 +123,25 @@ public class JsonTableGroupScan extends MapRDBGroupScan
implements IndexGroupSca
@JsonProperty("storage") FileSystemConfig
storagePluginConfig,
@JsonProperty("format") MapRDBFormatPluginConfig
formatPluginConfig,
@JsonProperty("columns") List<SchemaPath> columns,
- @JacksonInject StoragePluginRegistry
pluginRegistry) throws IOException, ExecutionSetupException {
- this (userName,
- (AbstractStoragePlugin)
pluginRegistry.getPlugin(storagePluginConfig),
- (MapRDBFormatPlugin)
pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
- scanSpec, columns);
+ // TODO: DRILL-7314 - replace TupleSchema with
TupleMetadata
+ @JsonProperty("schema") TupleSchema schema,
+ @JacksonInject StoragePluginRegistry
pluginRegistry) throws ExecutionSetupException, IOException {
+ this(userName, (AbstractStoragePlugin)
pluginRegistry.getPlugin(storagePluginConfig),
+ (MapRDBFormatPlugin)
pluginRegistry.getFormatPlugin(storagePluginConfig, formatPluginConfig),
+ scanSpec, columns, new MapRDBStatistics(),
FileSystemMetadataProviderManager.getMetadataProviderForSchema(schema));
}
public JsonTableGroupScan(String userName, AbstractStoragePlugin
storagePlugin,
- MapRDBFormatPlugin formatPlugin, JsonScanSpec
scanSpec, List<SchemaPath> columns) {
- super(storagePlugin, formatPlugin, columns, userName);
- this.scanSpec = scanSpec;
- this.stats = new MapRDBStatistics();
- this.forcedRowCountMap = new HashMap<>();
- init();
+ MapRDBFormatPlugin formatPlugin, JsonScanSpec
scanSpec, List<SchemaPath> columns,
+ MetadataProviderManager metadataProviderManager)
throws IOException {
+ this(userName, storagePlugin, formatPlugin, scanSpec, columns,
+ new MapRDBStatistics(),
FileSystemMetadataProviderManager.getMetadataProvider(metadataProviderManager));
}
- public JsonTableGroupScan(String userName, FileSystemPlugin storagePlugin,
+ public JsonTableGroupScan(String userName, AbstractStoragePlugin
storagePlugin,
MapRDBFormatPlugin formatPlugin, JsonScanSpec
scanSpec, List<SchemaPath> columns,
- MapRDBStatistics stats) {
- super(storagePlugin, formatPlugin, columns, userName);
+ MapRDBStatistics stats, TableMetadataProvider
metadataProvider) {
+ super(storagePlugin, formatPlugin, columns, userName, metadataProvider);
this.scanSpec = scanSpec;
this.stats = stats;
this.forcedRowCountMap = new HashMap<>();
@@ -293,7 +296,7 @@ public class JsonTableGroupScan extends MapRDBGroupScan
implements IndexGroupSca
assert minorFragmentId < endpointFragmentMapping.size() : String.format(
"Mappings length [%d] should be greater than minor fragment id [%d]
but it isn't.", endpointFragmentMapping.size(),
minorFragmentId);
- return new MapRDBSubScan(getUserName(), formatPlugin,
endpointFragmentMapping.get(minorFragmentId), columns, maxRecordsToRead,
TABLE_JSON);
+ return new MapRDBSubScan(getUserName(), formatPlugin,
endpointFragmentMapping.get(minorFragmentId), columns, maxRecordsToRead,
TABLE_JSON, getSchema());
}
@Override
@@ -471,15 +474,21 @@ public class JsonTableGroupScan extends MapRDBGroupScan
implements IndexGroupSca
@Override
public RestrictedJsonTableGroupScan getRestrictedScan(List<SchemaPath>
columns) {
- RestrictedJsonTableGroupScan newScan =
- new RestrictedJsonTableGroupScan(this.getUserName(),
+ try {
+ RestrictedJsonTableGroupScan newScan = new
RestrictedJsonTableGroupScan(this.getUserName(),
(FileSystemPlugin) this.getStoragePlugin(),
this.getFormatPlugin(),
this.getScanSpec(),
this.getColumns(),
- this.getStatistics());
- newScan.columns = columns;
- return newScan;
+ this.getStatistics(),
+ // TODO: DRILL-7314 - replace TupleSchema with TupleMetadata
+ (TupleSchema) this.getSchema());
+
+ newScan.columns = columns;
+ return newScan;
+ } catch (IOException e) {
+ throw new DrillRuntimeException("Error happened when constructing
RestrictedJsonTableGroupScan", e);
+ }
}
/**
diff --git
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
index 2d32e9f..5e80e62 100644
---
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
+++
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/MaprDBJsonRecordReader.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
@@ -122,6 +123,7 @@ public class MaprDBJsonRecordReader extends
AbstractRecordReader {
private final boolean ignoreSchemaChange;
private final boolean disableCountOptimization;
private final boolean nonExistentColumnsProjection;
+ private final TupleMetadata schema;
protected final MapRDBSubScanSpec subScanSpec;
protected final MapRDBFormatPlugin formatPlugin;
@@ -133,8 +135,8 @@ public class MaprDBJsonRecordReader extends
AbstractRecordReader {
protected Document lastDocument;
public MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
MapRDBFormatPlugin formatPlugin,
- List<SchemaPath> projectedColumns,
FragmentContext context, int maxRecords) {
- this(subScanSpec, formatPlugin, projectedColumns, context);
+ List<SchemaPath> projectedColumns,
FragmentContext context, int maxRecords, TupleMetadata schema) {
+ this(subScanSpec, formatPlugin, projectedColumns, context, schema);
this.maxRecordsToRead = maxRecords;
this.lastDocumentReader = null;
this.lastDocument = null;
@@ -142,12 +144,13 @@ public class MaprDBJsonRecordReader extends
AbstractRecordReader {
}
protected MaprDBJsonRecordReader(MapRDBSubScanSpec subScanSpec,
MapRDBFormatPlugin formatPlugin,
- List<SchemaPath> projectedColumns,
FragmentContext context) {
+ List<SchemaPath> projectedColumns,
FragmentContext context, TupleMetadata schema) {
buffer = context.getManagedBuffer();
final Path tablePath = new Path(Preconditions.checkNotNull(subScanSpec,
"MapRDB reader needs a sub-scan spec").getTableName());
this.subScanSpec = subScanSpec;
this.formatPlugin = formatPlugin;
+ this.schema = schema;
final IndexDesc indexDesc = subScanSpec.getIndexDesc();
byte[] serializedFilter = subScanSpec.getSerializedFilter();
condition = null;
@@ -445,7 +448,7 @@ public class MaprDBJsonRecordReader extends
AbstractRecordReader {
}
if (nonExistentColumnsProjection && recordCount > 0) {
- JsonReaderUtils.ensureAtLeastOneField(vectorWriter, getColumns(),
allTextMode, Collections.emptyList());
+ JsonReaderUtils.ensureAtLeastOneField(vectorWriter, getColumns(),
schema, allTextMode, Collections.emptyList());
}
vectorWriter.setValueCount(recordCount);
if (maxRecordsToRead > 0) {
diff --git
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonRecordReader.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonRecordReader.java
index bf150c1..a382163 100644
---
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonRecordReader.java
+++
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonRecordReader.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.physical.impl.join.RowKeyJoin;
import org.apache.drill.exec.record.AbstractRecordBatch;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
import org.apache.drill.exec.store.mapr.db.MapRDBSubScanSpec;
import org.apache.drill.exec.store.mapr.db.RestrictedMapRDBSubScanSpec;
@@ -61,9 +62,10 @@ public class RestrictedJsonRecordReader extends
MaprDBJsonRecordReader {
MapRDBFormatPlugin formatPlugin,
List<SchemaPath> projectedColumns,
FragmentContext context,
- int maxRecordsToRead) {
+ int maxRecordsToRead,
+ TupleMetadata schema) {
- super(subScanSpec, formatPlugin, projectedColumns, context,
maxRecordsToRead);
+ super(subScanSpec, formatPlugin, projectedColumns, context,
maxRecordsToRead, schema);
batchSize =
(int)context.getOptions().getOption(ExecConstants.QUERY_ROWKEYJOIN_BATCHSIZE);
int idx = 0;
FieldPath[] scannedFields = this.getScannedFields();
diff --git
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java
index 055c5a5..e1285eb 100644
---
a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java
+++
b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/json/RestrictedJsonTableGroupScan.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.mapr.db.json;
+import java.io.IOException;
import java.util.List;
import java.util.NavigableMap;
@@ -24,6 +25,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.exec.record.metadata.TupleSchema;
+import org.apache.drill.metastore.FileSystemMetadataProviderManager;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -53,12 +56,15 @@ public class RestrictedJsonTableGroupScan extends
JsonTableGroupScan {
@JsonCreator
public RestrictedJsonTableGroupScan(@JsonProperty("userName") String
userName,
- @JsonProperty("storage") FileSystemPlugin
storagePlugin,
- @JsonProperty("format") MapRDBFormatPlugin
formatPlugin,
- @JsonProperty("scanSpec") JsonScanSpec scanSpec,
/* scan spec of the original table */
- @JsonProperty("columns") List<SchemaPath> columns,
- @JsonProperty("") MapRDBStatistics statistics) {
- super(userName, storagePlugin, formatPlugin, scanSpec, columns,
statistics);
+ @JsonProperty("storage")
FileSystemPlugin storagePlugin,
+ @JsonProperty("format")
MapRDBFormatPlugin formatPlugin,
+ @JsonProperty("scanSpec") JsonScanSpec
scanSpec, /* scan spec of the original table */
+ @JsonProperty("columns")
List<SchemaPath> columns,
+ @JsonProperty("") MapRDBStatistics
statistics,
+ // TODO: DRILL-7314 - replace
TupleSchema with TupleMetadata
+ @JsonProperty("schema") TupleSchema
schema) throws IOException {
+ super(userName, storagePlugin, formatPlugin, scanSpec, columns,
+ statistics,
FileSystemMetadataProviderManager.getMetadataProviderForSchema(schema));
}
// TODO: this method needs to be fully implemented
@@ -82,7 +88,7 @@ public class RestrictedJsonTableGroupScan extends
JsonTableGroupScan {
minorFragmentId);
RestrictedMapRDBSubScan subscan =
new RestrictedMapRDBSubScan(getUserName(), formatPlugin,
- getEndPointFragmentMapping(minorFragmentId), columns,
maxRecordsToRead, TABLE_JSON);
+ getEndPointFragmentMapping(minorFragmentId), columns,
maxRecordsToRead, TABLE_JSON, getSchema());
return subscan;
}
diff --git
a/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
b/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
index 7aca59d..8b18347 100644
---
a/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
+++
b/contrib/storage-hive/core/scrMapr/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
@@ -21,13 +21,16 @@ import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.planner.index.MapRDBStatistics;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.PrelUtil;
+import org.apache.drill.exec.planner.types.HiveToRelDataTypeConverter;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.hive.HiveMetadataProvider;
import org.apache.drill.exec.store.hive.HiveReadEntry;
@@ -37,9 +40,11 @@ import
org.apache.drill.exec.store.mapr.db.MapRDBFormatPlugin;
import org.apache.drill.exec.store.mapr.db.MapRDBFormatPluginConfig;
import org.apache.drill.exec.store.mapr.db.json.JsonScanSpec;
import org.apache.drill.exec.store.mapr.db.json.JsonTableGroupScan;
+import org.apache.drill.metastore.FileSystemMetadataProviderManager;
import org.apache.hadoop.hive.maprdb.json.input.HiveMapRDBJsonInputFormat;
import org.ojai.DocumentConstants;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -105,7 +110,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan
extends StoragePlugi
To ensure Drill MapR-DB Json scan will be chosen, reduce Hive scan
importance to 0.
*/
call.getPlanner().setImportance(hiveScanRel, 0.0);
- } catch (DrillRuntimeException e) {
+ } catch (Exception e) {
// TODO: Improve error handling after allowing to throw IOException from
StoragePlugin.getFormatPlugin()
logger.warn("Failed to convert HiveScan to JsonScanSpec. Fallback to
HiveMapR-DB connector.", e);
}
@@ -114,28 +119,43 @@ public class
ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi
/**
* Helper method which creates a DrillScanRel with native Drill HiveScan.
*/
- private DrillScanRel createNativeScanRel(DrillScanRel hiveScanRel,
PlannerSettings settings) {
+ private DrillScanRel createNativeScanRel(DrillScanRel hiveScanRel,
PlannerSettings settings) throws IOException {
RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory();
HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
- Map<String, String> parameters =
hiveScan.getHiveReadEntry().getHiveTableWrapper().getParameters();
+ HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
+ Map<String, String> parameters =
hiveReadEntry.getHiveTableWrapper().getParameters();
JsonScanSpec scanSpec = new
JsonScanSpec(parameters.get(MAPRDB_TABLE_NAME), null, null);
List<SchemaPath> hiveScanCols = hiveScanRel.getColumns().stream()
.map(colNameSchemaPath -> replaceOverriddenSchemaPath(parameters,
colNameSchemaPath))
.collect(Collectors.toList());
+
+ // creates TupleMetadata based on Hive's schema (with optional data modes)
to be used in the reader
+ // for the case when column type wasn't discovered
+ HiveToRelDataTypeConverter dataTypeConverter = new
HiveToRelDataTypeConverter(typeFactory);
+ TupleMetadata schema = new TupleSchema();
+ hiveReadEntry.getTable().getColumnListsCache().getTableSchemaColumns()
+ .forEach(column ->
schema.addColumn(HiveUtilities.getColumnMetadata(dataTypeConverter, column)));
+
+ MapRDBFormatPluginConfig formatConfig = new MapRDBFormatPluginConfig();
+
+ formatConfig.readTimestampWithZoneOffset =
+
settings.getOptions().getBoolean(ExecConstants.HIVE_READ_MAPRDB_JSON_TIMESTAMP_WITH_TIMEZONE_OFFSET);
+
+ formatConfig.allTextMode =
settings.getOptions().getBoolean(ExecConstants.HIVE_MAPRDB_JSON_ALL_TEXT_MODE);
+
JsonTableGroupScan nativeMapRDBScan =
new JsonTableGroupScan(
hiveScan.getUserName(),
hiveScan.getStoragePlugin(),
// TODO: We should use Hive format plugins here, once it will be
implemented. DRILL-6621
- (MapRDBFormatPlugin)
hiveScan.getStoragePlugin().getFormatPlugin(new MapRDBFormatPluginConfig()),
+ (MapRDBFormatPlugin)
hiveScan.getStoragePlugin().getFormatPlugin(formatConfig),
scanSpec,
- hiveScanCols
+ hiveScanCols,
+ new MapRDBStatistics(),
+
FileSystemMetadataProviderManager.getMetadataProviderForSchema(schema)
);
- nativeMapRDBScan.getFormatPlugin().getConfig().readTimestampWithZoneOffset
=
-
settings.getOptions().getBoolean(ExecConstants.HIVE_READ_MAPRDB_JSON_TIMESTAMP_WITH_TIMEZONE_OFFSET);
-
List<String> nativeScanColNames =
hiveScanRel.getRowType().getFieldList().stream()
.map(field -> replaceOverriddenColumnId(parameters, field.getName()))
.collect(Collectors.toList());
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/types/HiveToRelDataTypeConverter.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/types/HiveToRelDataTypeConverter.java
index 3b3abf2..6cb88ae 100644
---
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/types/HiveToRelDataTypeConverter.java
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/types/HiveToRelDataTypeConverter.java
@@ -70,45 +70,44 @@ public class HiveToRelDataTypeConverter {
*/
public RelDataType convertToNullableRelDataType(FieldSchema field) {
TypeInfo fieldTypeInfo =
TypeInfoUtils.getTypeInfoFromTypeString(field.getType());
- RelDataType relDataType = convertToRelDataType(fieldTypeInfo);
- return typeFactory.createTypeWithNullability(relDataType, true);
+ return convertToRelDataType(fieldTypeInfo, true);
}
- private RelDataType convertToRelDataType(TypeInfo typeInfo) {
+ private RelDataType convertToRelDataType(TypeInfo typeInfo, boolean
nullable) {
final Category typeCategory = typeInfo.getCategory();
switch (typeCategory) {
case PRIMITIVE:
- return getRelDataType((PrimitiveTypeInfo) typeInfo);
+ return
typeFactory.createTypeWithNullability(getRelDataType((PrimitiveTypeInfo)
typeInfo), nullable);
case LIST:
- return getRelDataType((ListTypeInfo) typeInfo);
+ return
typeFactory.createTypeWithNullability(getRelDataType((ListTypeInfo) typeInfo,
nullable), nullable);
case MAP:
- return getRelDataType((MapTypeInfo) typeInfo);
+ return
typeFactory.createTypeWithNullability(getRelDataType((MapTypeInfo) typeInfo,
nullable), nullable);
case STRUCT:
- return getRelDataType((StructTypeInfo) typeInfo);
+ return
typeFactory.createTypeWithNullability(getRelDataType((StructTypeInfo) typeInfo,
nullable), nullable);
case UNION:
logger.warn("There is no UNION data type in SQL. Converting it to Sql
type OTHER to avoid " +
"breaking INFORMATION_SCHEMA queries");
- return typeFactory.createSqlType(SqlTypeName.OTHER);
+ return
typeFactory.createTypeWithNullability(typeFactory.createSqlType(SqlTypeName.OTHER),
nullable);
}
throw new
RuntimeException(String.format(UNSUPPORTED_HIVE_DATA_TYPE_ERROR_MSG,
typeCategory));
}
- private RelDataType getRelDataType(StructTypeInfo structTypeInfo) {
+ private RelDataType getRelDataType(StructTypeInfo structTypeInfo, boolean
nullable) {
final List<String> fieldNames = structTypeInfo.getAllStructFieldNames();
final List<RelDataType> relDataTypes =
structTypeInfo.getAllStructFieldTypeInfos().stream()
- .map(this::convertToRelDataType)
+ .map(typeInfo -> convertToRelDataType(typeInfo, nullable))
.collect(Collectors.toList());
return typeFactory.createStructType(relDataTypes, fieldNames);
}
- private RelDataType getRelDataType(MapTypeInfo mapTypeInfo) {
- RelDataType keyType =
convertToRelDataType(mapTypeInfo.getMapKeyTypeInfo());
- RelDataType valueType =
convertToRelDataType(mapTypeInfo.getMapValueTypeInfo());
+ private RelDataType getRelDataType(MapTypeInfo mapTypeInfo, boolean
nullable) {
+ RelDataType keyType =
convertToRelDataType(mapTypeInfo.getMapKeyTypeInfo(), nullable);
+ RelDataType valueType =
convertToRelDataType(mapTypeInfo.getMapValueTypeInfo(), nullable);
return typeFactory.createMapType(keyType, valueType);
}
- private RelDataType getRelDataType(ListTypeInfo listTypeInfo) {
- RelDataType listElemTypeInfo =
convertToRelDataType(listTypeInfo.getListElementTypeInfo());
+ private RelDataType getRelDataType(ListTypeInfo listTypeInfo, boolean
nullable) {
+ RelDataType listElemTypeInfo =
convertToRelDataType(listTypeInfo.getListElementTypeInfo(), nullable);
return typeFactory.createArrayType(listElemTypeInfo, -1);
}
diff --git
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index 1b1c3e3..4920f17 100644
---
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -17,6 +17,18 @@
*/
package org.apache.drill.exec.store.hive;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.planner.sql.TypeInferenceUtils;
+import org.apache.drill.exec.planner.types.HiveToRelDataTypeConverter;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MapColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.base.Strings;
import io.netty.buffer.DrillBuf;
@@ -758,5 +770,93 @@ public class HiveUtilities {
return new HiveTableWrapper.HivePartitionWrapper(new
HivePartition(partition, listIndex));
}
+ /**
+ * Converts specified {@code RelDataType relDataType} into {@link
ColumnMetadata}.
+ * For the case when specified relDataType is struct, map with recursively
converted children
+ * will be created.
+ *
+ * @param name filed name
+ * @param relDataType filed type
+ * @return {@link ColumnMetadata} which corresponds to specified {@code
RelDataType relDataType}
+ */
+ public static ColumnMetadata getColumnMetadata(String name, RelDataType
relDataType) {
+ switch (relDataType.getSqlTypeName()) {
+ case ARRAY:
+ return getArrayMetadata(name, relDataType);
+ case MAP:
+ case OTHER:
+ throw new UnsupportedOperationException(String.format("Unsupported
data type: %s", relDataType.getSqlTypeName()));
+ default:
+ if (relDataType.isStruct()) {
+ return getStructMetadata(name, relDataType);
+ } else {
+ return new PrimitiveColumnMetadata(
+ MaterializedField.create(name,
+
TypeInferenceUtils.getDrillMajorTypeFromCalciteType(relDataType)));
+ }
+ }
+ }
+
+ /**
+ * Returns {@link ColumnMetadata} instance which corresponds to specified
array {@code RelDataType relDataType}.
+ *
+ * @param name name of the filed
+ * @param relDataType the source of type information to construct the schema
+ * @return {@link ColumnMetadata} instance
+ */
+ private static ColumnMetadata getArrayMetadata(String name, RelDataType
relDataType) {
+ RelDataType componentType = relDataType.getComponentType();
+ ColumnMetadata childColumnMetadata = getColumnMetadata(name,
componentType);
+ switch (componentType.getSqlTypeName()) {
+ case ARRAY:
+ // for the case when nested type is array, it should be placed into
repeated list
+ return MetadataUtils.newRepeatedList(name, childColumnMetadata);
+ case MAP:
+ case OTHER:
+ throw new UnsupportedOperationException(String.format("Unsupported
data type: %s", relDataType.getSqlTypeName()));
+ default:
+ if (componentType.isStruct()) {
+ // for the case when nested type is struct, it should be placed into
repeated map
+ return MetadataUtils.newMapArray(name,
childColumnMetadata.mapSchema());
+ } else {
+ // otherwise creates column metadata with repeated data mode
+ return new PrimitiveColumnMetadata(
+ MaterializedField.create(name,
+ Types.overrideMode(
+
TypeInferenceUtils.getDrillMajorTypeFromCalciteType(componentType),
+ DataMode.REPEATED)));
+ }
+ }
+ }
+
+ /**
+ * Returns {@link MapColumnMetadata} column metadata created based on
specified {@code RelDataType relDataType} with
+ * converted to {@link ColumnMetadata} {@code relDataType}'s children.
+ *
+ * @param name name of the filed
+ * @param relDataType {@link RelDataType} the source of the children for
resulting schema
+ * @return {@link MapColumnMetadata} column metadata
+ */
+ private static MapColumnMetadata getStructMetadata(String name, RelDataType
relDataType) {
+ TupleMetadata mapSchema = new TupleSchema();
+ for (RelDataTypeField relDataTypeField : relDataType.getFieldList()) {
+ mapSchema.addColumn(getColumnMetadata(relDataTypeField.getName(),
relDataTypeField.getType()));
+ }
+ return MetadataUtils.newMap(name, mapSchema);
+ }
+
+ /**
+ * Converts specified {@code FieldSchema column} into {@link ColumnMetadata}.
+ * For the case when specified relDataType is struct, map with recursively
converted children
+ * will be created.
+ *
+ * @param dataTypeConverter converter to obtain Calcite's types from Hive's
ones
+ * @param column column to convert
+ * @return {@link ColumnMetadata} which corresponds to specified {@code
FieldSchema column}
+ */
+ public static ColumnMetadata getColumnMetadata(HiveToRelDataTypeConverter
dataTypeConverter, FieldSchema column) {
+ RelDataType relDataType =
dataTypeConverter.convertToNullableRelDataType(column);
+ return getColumnMetadata(column.getName(), relDataType);
+ }
}
diff --git
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/schema/TestSchemaConversion.java
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/schema/TestSchemaConversion.java
new file mode 100644
index 0000000..7154c1b
--- /dev/null
+++
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/schema/TestSchemaConversion.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.hive.schema;
+
+import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.planner.types.DrillRelDataTypeSystem;
+import org.apache.drill.exec.planner.types.HiveToRelDataTypeConverter;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.PrimitiveColumnMetadata;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.store.hive.HiveUtilities;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({SlowTest.class})
+public class TestSchemaConversion {
+ private static final HiveToRelDataTypeConverter dataTypeConverter
+ = new HiveToRelDataTypeConverter(new SqlTypeFactoryImpl(new
DrillRelDataTypeSystem()));
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testPrimitiveSchema() {
+ verifyConversion("int", Types.optional(TypeProtos.MinorType.INT));
+ verifyConversion("varchar(123)",
TypeProtos.MajorType.newBuilder().setMinorType(TypeProtos.MinorType.VARCHAR).setPrecision(123).build());
+ verifyConversion("timestamp",
Types.optional(TypeProtos.MinorType.TIMESTAMP));
+ }
+
+ @Test
+ public void testStructSchema() {
+ ColumnMetadata expectedSchema = new SchemaBuilder()
+ .addMap("a")
+ .addNullable("t1", TypeProtos.MinorType.BIT)
+ .addNullable("t2", TypeProtos.MinorType.INT)
+ .resumeSchema()
+ .buildSchema()
+ .metadata(0);
+ verifyConversion("struct<t1:boolean,t2:int>", expectedSchema);
+
+ expectedSchema = new SchemaBuilder()
+ .addMap("a")
+ .addNullable("t1", TypeProtos.MinorType.BIT)
+ .addMap("t2")
+ .addNullable("t3", TypeProtos.MinorType.VARDECIMAL, 38, 8)
+ .resumeMap()
+ .resumeSchema()
+ .buildSchema()
+ .metadata(0);
+ verifyConversion("struct<t1:boolean,t2:struct<t3:decimal(38,8)>>",
expectedSchema);
+ }
+
+ @Test
+ public void testRepeatedSchema() {
+ verifyConversion("array<boolean>",
Types.repeated(TypeProtos.MinorType.BIT));
+
+ ColumnMetadata expectedSchema = new SchemaBuilder()
+ .addRepeatedList("a")
+ .addArray(TypeProtos.MinorType.BIT)
+ .resumeSchema()
+ .buildSchema()
+ .metadata(0);
+ verifyConversion("array<array<boolean>>", expectedSchema);
+
+ expectedSchema = new SchemaBuilder()
+ .addRepeatedList("a")
+ .addDimension()
+ .addArray(TypeProtos.MinorType.BIT)
+ .resumeList()
+ .resumeSchema()
+ .buildSchema()
+ .metadata(0);
+ verifyConversion("array<array<array<boolean>>>", expectedSchema);
+ }
+
+ @Test
+ public void testRepeatedStructSchema() {
+ ColumnMetadata expectedSchema = new SchemaBuilder()
+ .addMapArray("a")
+ .addNullable("t1", TypeProtos.MinorType.VARCHAR, 999)
+ .addNullable("t2", TypeProtos.MinorType.INT)
+ .resumeSchema()
+ .buildSchema()
+ .metadata(0);
+ verifyConversion("array<struct<t1:varchar(999),t2:int>>", expectedSchema);
+
+ expectedSchema = new SchemaBuilder()
+ .addRepeatedList("a")
+ .addMapArray()
+ .addNullable("t1", TypeProtos.MinorType.VARCHAR, 999)
+ .addNullable("t2", TypeProtos.MinorType.INT)
+ .resumeList()
+ .resumeSchema()
+ .buildSchema()
+ .metadata(0);
+ verifyConversion("array<array<struct<t1:varchar(999),t2:int>>>",
expectedSchema);
+
+ expectedSchema = new SchemaBuilder()
+ .addRepeatedList("a")
+ .addDimension()
+ .addMapArray()
+ .addRepeatedList("t1")
+ .addArray(TypeProtos.MinorType.VARCHAR, 999)
+ .resumeMap()
+ .addArray("t2", TypeProtos.MinorType.VARDECIMAL, 28, 14)
+ .resumeList()
+ .resumeList()
+ .resumeSchema()
+ .buildSchema()
+ .metadata(0);
+
verifyConversion("array<array<array<struct<t1:array<array<varchar(999)>>,t2:array<decimal(28,14)>>>>>",
expectedSchema);
+ }
+
+ @Test
+ public void testUnionSchema() {
+ thrown.expect(UnsupportedOperationException.class);
+ convertType("uniontype<int,double>");
+ }
+
+ @Test
+ public void testListUnionSchema() {
+ thrown.expect(UnsupportedOperationException.class);
+ convertType("array<uniontype<int,double>>");
+ }
+
+ @Test
+ public void testStructUnionSchema() {
+ thrown.expect(UnsupportedOperationException.class);
+ convertType("struct<a:uniontype<int,double>,b:int>");
+ }
+
+ @Test
+ public void testMapSchema() {
+ thrown.expect(UnsupportedOperationException.class);
+ convertType("map<int,varchar(23)>");
+ }
+
+ @Test
+ public void testRepeatedMapSchema() {
+ thrown.expect(UnsupportedOperationException.class);
+ convertType("array<map<int,varchar(23)>>");
+ }
+
+ private void verifyConversion(String hiveType, TypeProtos.MajorType type) {
+ assertEquals(new PrimitiveColumnMetadata("a", type).columnString(),
convertType(hiveType).columnString());
+ }
+
+ private void verifyConversion(String hiveType, ColumnMetadata
expectedSchema) {
+ assertEquals(expectedSchema.columnString(),
convertType(hiveType).columnString());
+ }
+
+ private ColumnMetadata convertType(String type) {
+ return HiveUtilities.getColumnMetadata(dataTypeConverter, new
FieldSchema("a", type, ""));
+ }
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index a7bfd96..1815855 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -531,7 +531,12 @@ public final class ExecConstants {
public static final String
HIVE_READ_MAPRDB_JSON_TIMESTAMP_WITH_TIMEZONE_OFFSET =
"store.hive.maprdb_json.read_timestamp_with_timezone_offset";
public static final OptionValidator
HIVE_READ_MAPRDB_JSON_TIMESTAMP_WITH_TIMEZONE_OFFSET_VALIDATOR =
new
BooleanValidator(HIVE_READ_MAPRDB_JSON_TIMESTAMP_WITH_TIMEZONE_OFFSET,
- new OptionDescription("Enables Drill to read timestamp values with
timezone offset when hive plugin is used and Drill native MaprDB JSON reader
usage is enabled. (Drill 1.16+)"));
+ new OptionDescription("Enables Drill to read timestamp values with
timezone offset when Hive plugin is used and Drill native MaprDB JSON reader
usage is enabled. (Drill 1.16+)"));
+
+ public static final String HIVE_MAPRDB_JSON_ALL_TEXT_MODE =
"store.hive.maprdb_json.all_text_mode";
+ public static final OptionValidator HIVE_MAPRDB_JSON_ALL_TEXT_MODE_VALIDATOR
=
+ new BooleanValidator(HIVE_MAPRDB_JSON_ALL_TEXT_MODE,
+ new OptionDescription("Drill reads all data from the maprDB Json
tables as VARCHAR when hive plugin is used and Drill native MaprDB JSON reader
usage is enabled. Prevents schema change errors. (Drill 1.17+)"));
public static final String HIVE_CONF_PROPERTIES =
"store.hive.conf.properties";
public static final OptionValidator HIVE_CONF_PROPERTIES_VALIDATOR = new
StringValidator(HIVE_CONF_PROPERTIES,
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/TypeInferenceUtils.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/TypeInferenceUtils.java
index 9b01424..2953314 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/TypeInferenceUtils.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/TypeInferenceUtils.java
@@ -226,6 +226,39 @@ public class TypeInferenceUtils {
}
/**
+ * Returns {@link TypeProtos.MajorType} instance which corresponds to
specified {@code RelDataType relDataType}
+ * with its nullability, scale and precision if it is available.
+ *
+ * @param relDataType RelDataType to convert
+ * @return {@link TypeProtos.MajorType} instance
+ */
+ public static TypeProtos.MajorType
getDrillMajorTypeFromCalciteType(RelDataType relDataType) {
+ final SqlTypeName sqlTypeName = relDataType.getSqlTypeName();
+
+ TypeProtos.MinorType minorType = getDrillTypeFromCalciteType(sqlTypeName);
+ TypeProtos.MajorType.Builder typeBuilder =
TypeProtos.MajorType.newBuilder().setMinorType(minorType);
+ switch (minorType) {
+ case VAR16CHAR:
+ case VARCHAR:
+ case VARBINARY:
+ case TIMESTAMP:
+ if (relDataType.getPrecision() > 0) {
+ typeBuilder.setPrecision(relDataType.getPrecision());
+ }
+ break;
+ case VARDECIMAL:
+ typeBuilder.setPrecision(relDataType.getPrecision());
+ typeBuilder.setScale(relDataType.getScale());
+ }
+ if (relDataType.isNullable()) {
+ typeBuilder.setMode(TypeProtos.DataMode.OPTIONAL);
+ } else {
+ typeBuilder.setMode(TypeProtos.DataMode.REQUIRED);
+ }
+ return typeBuilder.build();
+ }
+
+ /**
* Given a Calcite's SqlTypeName, return a Drill's corresponding
TypeProtos.MinorType
*/
public static TypeProtos.MinorType getDrillTypeFromCalciteType(final
SqlTypeName sqlTypeName) {
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 4c28887..88bba32 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -204,6 +204,7 @@ public class SystemOptionManager extends BaseOptionManager
implements AutoClosea
new
OptionDefinition(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER_VALIDATOR),
new
OptionDefinition(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR),
new
OptionDefinition(ExecConstants.HIVE_READ_MAPRDB_JSON_TIMESTAMP_WITH_TIMEZONE_OFFSET_VALIDATOR),
+ new
OptionDefinition(ExecConstants.HIVE_MAPRDB_JSON_ALL_TEXT_MODE_VALIDATOR),
new OptionDefinition(ExecConstants.HIVE_CONF_PROPERTIES_VALIDATOR),
new OptionDefinition(ExecConstants.SLICE_TARGET_OPTION),
new OptionDefinition(ExecConstants.AFFINITY_FACTOR),
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index c71cc3d..ac83a71 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -90,6 +90,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
@JacksonInject StoragePluginRegistry engineRegistry,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("selectionRoot") Path selectionRoot,
+ // TODO: DRILL-7314 - replace TupleSchema with TupleMetadata
@JsonProperty("schema") TupleSchema schema
) throws IOException, ExecutionSetupException {
super(ImpersonationUtil.resolveUserName(userName));
@@ -271,7 +272,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
String.format("MinorFragmentId %d has no read entries assigned",
minorFragmentId));
EasySubScan subScan = new EasySubScan(getUserName(),
convert(filesForMinor), formatPlugin,
- columns, selectionRoot, partitionDepth,
getTableMetadata().getSchema());
+ columns, selectionRoot, partitionDepth, getSchema());
subScan.setOperatorId(this.getOperatorId());
return subScan;
}
@@ -297,7 +298,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
@Override
public String toString() {
String pattern = "EasyGroupScan [selectionRoot=%s, numFiles=%s,
columns=%s, files=%s, schema=%s]";
- return String.format(pattern, selectionRoot, getFiles().size(), columns,
getFiles(), getTableMetadata().getSchema());
+ return String.format(pattern, selectionRoot, getFiles().size(), columns,
getFiles(), getSchema());
}
@Override
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java
index 1c793a5..5895ba5 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderUtils.java
@@ -17,11 +17,16 @@
*/
package org.apache.drill.exec.vector.complex.fn;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.vector.complex.impl.ComplexCopier;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.vector.complex.writer.BaseWriter;
+import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.List;
@@ -33,25 +38,51 @@ public class JsonReaderUtils {
boolean allTextMode,
List<BaseWriter.ListWriter>
emptyArrayWriters) {
- List<BaseWriter.MapWriter> writerList = Lists.newArrayList();
- List<PathSegment> fieldPathList = Lists.newArrayList();
+ ensureAtLeastOneField(writer, columns, null /* schema */, allTextMode,
emptyArrayWriters);
+ }
+
+ public static void ensureAtLeastOneField(BaseWriter.ComplexWriter writer,
+ Collection<SchemaPath> columns,
+ TupleMetadata schema,
+ boolean allTextMode,
+ List<BaseWriter.ListWriter>
emptyArrayWriters) {
+
+ List<BaseWriter.MapWriter> writerList = new ArrayList<>();
+ List<PathSegment> fieldPathList = new ArrayList<>();
+ List<TypeProtos.MajorType> types = new ArrayList<>();
BitSet emptyStatus = new BitSet(columns.size());
- int i = 0;
+ int fieldIndex = 0;
// first pass: collect which fields are empty
- for (SchemaPath sp : columns) {
- PathSegment fieldPath = sp.getRootSegment();
+ for (SchemaPath schemaPath : columns) {
+ PathSegment fieldPath = schemaPath.getRootSegment();
BaseWriter.MapWriter fieldWriter = writer.rootAsMap();
+ TupleMetadata columnMetadata = schema;
while (fieldPath.getChild() != null && !fieldPath.getChild().isArray()) {
- fieldWriter = fieldWriter.map(fieldPath.getNameSegment().getPath());
+ String name = fieldPath.getNameSegment().getPath();
+ if (columnMetadata != null) {
+ ColumnMetadata metadata = columnMetadata.metadata(name);
+ columnMetadata = metadata != null ? metadata.mapSchema() : null;
+ }
+ fieldWriter = fieldWriter.map(name);
fieldPath = fieldPath.getChild();
}
writerList.add(fieldWriter);
fieldPathList.add(fieldPath);
+ // for the case when field is absent in the schema, use VARCHAR type
+ // if allTextMode is enabled or INT type if it is disabled
+ TypeProtos.MajorType majorType = allTextMode
+ ? Types.optional(TypeProtos.MinorType.VARCHAR)
+ : Types.optional(TypeProtos.MinorType.INT);
+ if (columnMetadata != null) {
+ ColumnMetadata metadata =
columnMetadata.metadata(fieldPath.getNameSegment().getPath());
+ majorType = metadata != null ? metadata.majorType() : majorType;
+ }
+ types.add(majorType);
if (fieldWriter.isEmptyMap()) {
- emptyStatus.set(i, true);
+ emptyStatus.set(fieldIndex, true);
}
- if (i == 0 && !allTextMode) {
+ if (fieldIndex == 0 && !allTextMode && schema == null) {
// when allTextMode is false, there is not much benefit to producing
all
// the empty fields; just produce 1 field. The reason is that the type
of the
// fields is unknown, so if we produce multiple Integer fields by
default, a
@@ -61,7 +92,7 @@ public class JsonReaderUtils {
// is necessary in order to avoid schema change exceptions by
downstream operators.
break;
}
- i++;
+ fieldIndex++;
}
// second pass: create default typed vectors corresponding to empty fields
@@ -72,11 +103,7 @@ public class JsonReaderUtils {
BaseWriter.MapWriter fieldWriter = writerList.get(j);
PathSegment fieldPath = fieldPathList.get(j);
if (emptyStatus.get(j)) {
- if (allTextMode) {
- fieldWriter.varChar(fieldPath.getNameSegment().getPath());
- } else {
- fieldWriter.integer(fieldPath.getNameSegment().getPath());
- }
+ ComplexCopier.getMapWriterForType(types.get(j), fieldWriter,
fieldPath.getNameSegment().getPath());
}
}
@@ -91,4 +118,4 @@ public class JsonReaderUtils {
}
}
}
-}
\ No newline at end of file
+}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/metastore/FileSystemMetadataProviderManager.java
b/exec/java-exec/src/main/java/org/apache/drill/metastore/FileSystemMetadataProviderManager.java
index cbcf7fb..ca5dfd9 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/metastore/FileSystemMetadataProviderManager.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/metastore/FileSystemMetadataProviderManager.java
@@ -18,11 +18,14 @@
package org.apache.drill.metastore;
import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
import org.apache.drill.exec.store.parquet.ParquetTableMetadataProviderImpl;
import org.apache.drill.metastore.metadata.TableMetadataProvider;
import org.apache.drill.metastore.metadata.TableMetadataProviderBuilder;
+import java.io.IOException;
+
/**
* Implementation of {@link MetadataProviderManager} which uses file system
providers and returns
* builders for file system based {@link TableMetadataProvider} instances.
@@ -38,6 +41,34 @@ public class FileSystemMetadataProviderManager implements
MetadataProviderManage
return new FileSystemMetadataProviderManager();
}
+ /**
+ * Returns {@link TableMetadataProvider} which provides specified schema.
+ *
+ * @param schema table schema which should be provided
+ * @return {@link TableMetadataProvider} which provides specified schema
+ * @throws IOException if exception has happened during {@link
TableMetadataProvider} construction
+ */
+ public static TableMetadataProvider
getMetadataProviderForSchema(TupleMetadata schema) throws IOException {
+ return new
FileSystemMetadataProviderManager().builder(MetadataProviderKind.SCHEMA_STATS_ONLY)
+ .withSchema(schema)
+ .build();
+ }
+
+ /**
+ * Checks whether specified {@link MetadataProviderManager} is not null and
returns {@link TableMetadataProvider}
+ * obtained from specified {@link MetadataProviderManager}.
+ * Otherwise {@link FileSystemMetadataProviderManager} is used to construct
{@link TableMetadataProvider}.
+ *
+ * @param providerManager metadata provider manager
+ * @return {@link MetadataProviderManager} instance
+ * @throws IOException if exception has happened during {@link
TableMetadataProvider} construction
+ */
+ public static TableMetadataProvider
getMetadataProvider(MetadataProviderManager providerManager) throws IOException
{
+ return providerManager == null
+ ? new
FileSystemMetadataProviderManager().builder(MetadataProviderKind.SCHEMA_STATS_ONLY).build()
+ : providerManager.getTableMetadataProvider();
+ }
+
@Override
public void setSchemaProvider(SchemaProvider schemaProvider) {
this.schemaProvider = schemaProvider;
diff --git a/exec/java-exec/src/main/resources/drill-module.conf
b/exec/java-exec/src/main/resources/drill-module.conf
index ea254a5..dbd73ce 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -606,6 +606,7 @@ drill.exec.options: {
store.hive.parquet.optimize_scan_with_native_reader: false,
store.hive.maprdb_json.optimize_scan_with_native_reader: false,
store.hive.maprdb_json.read_timestamp_with_timezone_offset: false,
+ store.hive.maprdb_json.all_text_mode: false,
# Properties values should NOT be set in double-quotes or any other quotes.
# Property name and value should be separated by =.
# Properties should be separated by new line (\n).
diff --git a/exec/vector/src/main/codegen/templates/ComplexCopier.java
b/exec/vector/src/main/codegen/templates/ComplexCopier.java
index 6d7c7e6..c138ef6 100644
--- a/exec/vector/src/main/codegen/templates/ComplexCopier.java
+++ b/exec/vector/src/main/codegen/templates/ComplexCopier.java
@@ -24,6 +24,7 @@
package org.apache.drill.exec.vector.complex.impl;
<#include "/@includes/vv_imports.ftl" />
+import org.apache.drill.common.types.TypeProtos;
/*
* This class is generated using freemarker and the ${.template_name} template.
@@ -90,10 +91,10 @@ public class ComplexCopier {
}
break;
}
- }
+ }
- private static FieldWriter getMapWriterForReader(FieldReader reader,
MapWriter writer, String name) {
- switch (reader.getType().getMinorType()) {
+ public static FieldWriter getMapWriterForType(TypeProtos.MajorType type,
MapWriter writer, String name) {
+ switch (type.getMinorType()) {
<#list vv.types as type><#list type.minor as minor><#assign name =
minor.class?cap_first />
<#assign fields = minor.fields!type.fields />
<#assign uncappedName = name?uncap_first/>
@@ -102,7 +103,7 @@ public class ComplexCopier {
return (FieldWriter) writer.<#if name ==
"Int">integer<#else>${uncappedName}</#if>(name);
<#elseif minor.class?contains("VarDecimal")>
case ${name?upper_case}:
- return (FieldWriter) writer.${uncappedName}(name,
reader.getType().getScale(), reader.getType().getPrecision());
+ return (FieldWriter) writer.${uncappedName}(name, type.getScale(),
type.getPrecision());
</#if>
</#list></#list>
case MAP:
@@ -110,7 +111,7 @@ public class ComplexCopier {
case LIST:
return (FieldWriter) writer.list(name);
default:
- throw new UnsupportedOperationException(reader.getType().toString());
+ throw new UnsupportedOperationException(type.toString());
}
}
@@ -135,4 +136,8 @@ public class ComplexCopier {
throw new UnsupportedOperationException(reader.getType().toString());
}
}
+
+ private static FieldWriter getMapWriterForReader(FieldReader reader,
BaseWriter.MapWriter writer, String name) {
+ return getMapWriterForType(reader.getType(), writer, name);
+ }
}
diff --git
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java
index 9bb6b8d..c02da5e 100644
---
a/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java
+++
b/exec/vector/src/main/java/org/apache/drill/exec/record/metadata/RepeatedListBuilder.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.record.metadata;
+import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -67,6 +68,35 @@ public class RepeatedListBuilder implements SchemaContainer {
return this;
}
+ public RepeatedListBuilder addArray(MinorType type, int width) {
+ // Existing code uses the repeated list name as the name of
+ // the vector within the list.
+
+ TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
+ .setMinorType(type)
+ .setMode(DataMode.REPEATED)
+ .setPrecision(width)
+ .build();
+
+ addColumn(MetadataUtils.newScalar(name, majorType));
+ return this;
+ }
+
+ public RepeatedListBuilder addArray(MinorType type, int precision, int
scale) {
+ // Existing code uses the repeated list name as the name of
+ // the vector within the list.
+
+ TypeProtos.MajorType majorType = TypeProtos.MajorType.newBuilder()
+ .setMinorType(type)
+ .setMode(DataMode.REPEATED)
+ .setPrecision(precision)
+ .setScale(scale)
+ .build();
+
+ addColumn(MetadataUtils.newScalar(name, majorType));
+ return this;
+ }
+
public RepeatedListColumnMetadata buildColumn() {
return MetadataUtils.newRepeatedList(name, child);
}
diff --git
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java
index 2bbf3c1..e0dc922 100644
---
a/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java
+++
b/metastore/metastore-api/src/main/java/org/apache/drill/metastore/util/SchemaPathUtils.java
@@ -57,6 +57,8 @@ public class SchemaPathUtils {
/**
* Adds column with specified schema path and type into specified {@code
TupleMetadata schema}.
+ * For the case when specified {@link SchemaPath} has children,
corresponding maps will be created
+ * in the {@code TupleMetadata schema} and the last child of the map will
have specified type.
*
* @param schema tuple schema where column should be added
* @param schemaPath schema path of the column which should be added