This is an automated email from the ASF dual-hosted git repository. arina pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 139f7156bcb2f6fef5b36f116c9c1b6095fc4b9c Author: Arina Ielchiieva <[email protected]> AuthorDate: Thu Jul 5 20:48:37 2018 +0000 DRILL-6575: Add store.hive.conf.properties option to allow set Hive properties at session level closes #1365 --- ...ertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java | 6 +- .../ConvertHiveParquetScanToDrillParquetScan.java | 21 ++-- .../hive/HiveDrillNativeParquetRowGroupScan.java | 17 +++- .../store/hive/HiveDrillNativeParquetScan.java | 20 +++- .../org/apache/drill/exec/store/hive/HiveScan.java | 105 ++++++++++---------- .../drill/exec/store/hive/HiveStoragePlugin.java | 57 ++++++++--- .../exec/store/hive/HiveStoragePluginConfig.java | 21 ++-- .../apache/drill/exec/store/hive/HiveSubScan.java | 22 +++-- .../drill/exec/store/hive/HiveUtilities.java | 108 ++++++++++++++++----- .../main/resources/bootstrap-storage-plugins.json | 2 +- .../exec/TestHiveDrillNativeParquetReader.java | 17 ++++ .../apache/drill/exec/hive/TestHiveStorage.java | 21 ++++ .../exec/hive/TestInfoSchemaOnHiveStorage.java | 2 + .../exec/store/hive/HiveTestDataGenerator.java | 80 ++++++++------- .../java/org/apache/drill/exec/ExecConstants.java | 3 + .../org/apache/drill/exec/opt/BasicOptimizer.java | 53 +++++----- .../drill/exec/planner/logical/DrillTable.java | 11 ++- .../drill/exec/planner/sql/SqlConverter.java | 34 +++---- .../exec/server/options/SystemOptionManager.java | 43 ++++---- .../drill/exec/store/AbstractStoragePlugin.java | 12 +++ .../org/apache/drill/exec/store/StoragePlugin.java | 45 ++++++--- .../java-exec/src/main/resources/drill-module.conf | 4 + 22 files changed, 457 insertions(+), 247 deletions(-) diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java index 50fee9c..3bc33b3 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java @@ -88,7 +88,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan(); HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry(); HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, - hiveScan.getStoragePlugin().getHiveConf()); + hiveScan.getHiveConf()); if (hiveMetadataProvider.getInputSplits(hiveReadEntry).isEmpty()) { // table is empty, use original scan return; @@ -134,7 +134,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi List<SchemaPath> hiveScanCols = hiveScanRel.getColumns().stream() .map(colNameSchemaPath -> replaceOverriddenSchemaPath(parameters, colNameSchemaPath)) .collect(Collectors.toList()); - JsonTableGroupScan nariveMapRDBScan = + JsonTableGroupScan nativeMapRDBScan = new JsonTableGroupScan( hiveScan.getUserName(), hiveScan.getStoragePlugin(), @@ -155,7 +155,7 @@ public class ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan extends StoragePlugi hiveScanRel.getCluster(), hiveScanRel.getTraitSet(), hiveScanRel.getTable(), - nariveMapRDBScan, + nativeMapRDBScan, nativeScanRowType, hiveScanCols); } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java index 2a2f4fb..ea71157 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.planner.sql.logical; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; @@ -43,6 +41,8 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -88,7 +88,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim final Table hiveTable = hiveScan.getHiveReadEntry().getTable(); final HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry(); - final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getStoragePlugin().getHiveConf()); + final HiveMetadataProvider hiveMetadataProvider = new HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, hiveScan.getHiveConf()); final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits = hiveMetadataProvider.getInputSplits(hiveReadEntry); if (logicalInputSplits.isEmpty()) { @@ -123,7 +123,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim * Create mapping of Hive partition column to directory column mapping. */ private Map<String, String> getPartitionColMapping(final Table hiveTable, final String partitionColumnLabel) { - final Map<String, String> partitionColMapping = Maps.newHashMap(); + final Map<String, String> partitionColMapping = new HashMap<>(); int i = 0; for (FieldSchema col : hiveTable.getPartitionKeys()) { partitionColMapping.put(col.getName(), partitionColumnLabel+i); @@ -143,8 +143,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim final RelDataTypeFactory typeFactory = hiveScanRel.getCluster().getTypeFactory(); final RelDataType varCharType = typeFactory.createSqlType(SqlTypeName.VARCHAR); - final List<String> nativeScanColNames = Lists.newArrayList(); - final List<RelDataType> nativeScanColTypes = Lists.newArrayList(); + final List<String> nativeScanColNames = new ArrayList<>(); + final List<RelDataType> nativeScanColTypes = new ArrayList<>(); for (RelDataTypeField field : hiveScanRel.getRowType().getFieldList()) { final String dirColName = partitionColMapping.get(field.getName()); if (dirColName != null) { // partition column @@ -161,8 +161,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim // Create the list of projected columns set in HiveScan. The order of this list may not be same as the order of // columns in HiveScan row type. Note: If the HiveScan.getColumn() contains a '*', we just need to add it as it is, // unlike above where we expanded the '*'. HiveScan and related (subscan) can handle '*'. - final List<SchemaPath> nativeScanCols = Lists.newArrayList(); - for(SchemaPath colName : hiveScanRel.getColumns()) { + final List<SchemaPath> nativeScanCols = new ArrayList<>(); + for (SchemaPath colName : hiveScanRel.getColumns()) { final String partitionCol = partitionColMapping.get(colName.getRootSegmentPath()); if (partitionCol != null) { nativeScanCols.add(SchemaPath.getSimplePath(partitionCol)); @@ -177,7 +177,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim hiveScan.getUserName(), nativeScanCols, hiveScan.getStoragePlugin(), - logicalInputSplits); + logicalInputSplits, + hiveScan.getConfProperties()); return new DrillScanRel( hiveScanRel.getCluster(), @@ -194,7 +195,7 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim private DrillProjectRel createProjectRel(final DrillScanRel hiveScanRel, final Map<String, String> partitionColMapping, final DrillScanRel nativeScanRel) { - final List<RexNode> rexNodes = Lists.newArrayList(); + final List<RexNode> rexNodes = new ArrayList<>(); final RexBuilder rb = hiveScanRel.getCluster().getRexBuilder(); final RelDataType hiveScanRowType = hiveScanRel.getRowType(); diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java index e227015..d334ec8 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java @@ -38,6 +38,7 @@ import org.apache.hadoop.mapred.JobConf; import java.io.IOException; import java.util.List; +import java.util.Map; @JsonTypeName("hive-drill-native-parquet-row-group-scan") public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupScan { @@ -45,6 +46,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS private final HiveStoragePlugin hiveStoragePlugin; private final HiveStoragePluginConfig hiveStoragePluginConfig; private final HivePartitionHolder hivePartitionHolder; + private final Map<String, String> confProperties; @JsonCreator public HiveDrillNativeParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry, @@ -53,12 +55,14 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS @JsonProperty("rowGroupReadEntries") List<RowGroupReadEntry> rowGroupReadEntries, @JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder, + @JsonProperty("confProperties") Map<String, String> confProperties, @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException { this(userName, (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig), rowGroupReadEntries, columns, hivePartitionHolder, + confProperties, filter); } @@ -67,11 +71,13 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS List<RowGroupReadEntry> rowGroupReadEntries, List<SchemaPath> columns, HivePartitionHolder hivePartitionHolder, + Map<String, String> confProperties, LogicalExpression filter) { super(userName, rowGroupReadEntries, columns, filter); this.hiveStoragePlugin = Preconditions.checkNotNull(hiveStoragePlugin, "Could not find format config for the given configuration"); this.hiveStoragePluginConfig = hiveStoragePlugin.getConfig(); this.hivePartitionHolder = hivePartitionHolder; + this.confProperties = confProperties; } @JsonProperty @@ -84,6 +90,11 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS return hivePartitionHolder; } + @JsonProperty + public Map<String, String> getConfProperties() { + return confProperties; + } + @JsonIgnore public HiveStoragePlugin getHiveStoragePlugin() { return hiveStoragePlugin; @@ -92,7 +103,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { Preconditions.checkArgument(children.isEmpty()); - return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter); + return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, confProperties, filter); } @Override @@ -102,7 +113,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS @Override public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) { - return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter); + return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, confProperties, filter); } @Override @@ -114,7 +125,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws IOException { Path path = new Path(rowGroupReadEntry.getPath()).getParent(); return new ProjectionPusher().pushProjectionsAndFilters( - new JobConf(hiveStoragePlugin.getHiveConf()), + new JobConf(HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties)), path.getParent()); } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java index 03a80d3..a973fa1 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java @@ -62,7 +62,8 @@ import java.util.Map; public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan { private final HiveStoragePlugin hiveStoragePlugin; - private HivePartitionHolder hivePartitionHolder; + private final HivePartitionHolder hivePartitionHolder; + private final Map<String, String> confProperties; @JsonCreator public HiveDrillNativeParquetScan(@JacksonInject StoragePluginRegistry engineRegistry, @@ -71,10 +72,12 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan { @JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("entries") List<ReadEntryWithPath> entries, @JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder, + @JsonProperty("confProperties") Map<String, String> confProperties, @JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException { super(ImpersonationUtil.resolveUserName(userName), columns, entries, filter); this.hiveStoragePlugin = (HiveStoragePlugin) engineRegistry.getPlugin(hiveStoragePluginConfig); this.hivePartitionHolder = hivePartitionHolder; + this.confProperties = confProperties; init(); } @@ -82,19 +85,22 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan { public HiveDrillNativeParquetScan(String userName, List<SchemaPath> columns, HiveStoragePlugin hiveStoragePlugin, - List<LogicalInputSplit> logicalInputSplits) throws IOException { - this(userName, columns, hiveStoragePlugin, logicalInputSplits, ValueExpressions.BooleanExpression.TRUE); + List<LogicalInputSplit> logicalInputSplits, + Map<String, String> confProperties) throws IOException { + this(userName, columns, hiveStoragePlugin, logicalInputSplits, confProperties, ValueExpressions.BooleanExpression.TRUE); } public HiveDrillNativeParquetScan(String userName, List<SchemaPath> columns, HiveStoragePlugin hiveStoragePlugin, List<LogicalInputSplit> logicalInputSplits, + Map<String, String> confProperties, LogicalExpression filter) throws IOException { super(userName, columns, new ArrayList<>(), filter); this.hiveStoragePlugin = hiveStoragePlugin; this.hivePartitionHolder = new HivePartitionHolder(); + this.confProperties = confProperties; for (LogicalInputSplit logicalInputSplit : logicalInputSplits) { Iterator<InputSplit> iterator = logicalInputSplit.getInputSplits().iterator(); @@ -122,6 +128,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan { super(that); this.hiveStoragePlugin = that.hiveStoragePlugin; this.hivePartitionHolder = that.hivePartitionHolder; + this.confProperties = that.confProperties; } @JsonProperty @@ -134,6 +141,11 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan { return hivePartitionHolder; } + @JsonProperty + public Map<String, String> getConfProperties() { + return confProperties; + } + @Override public SubScan getSpecificScan(int minorFragmentId) { List<RowGroupReadEntry> readEntries = getReadEntries(minorFragmentId); @@ -142,7 +154,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan { List<String> values = hivePartitionHolder.get(readEntry.getPath()); subPartitionHolder.add(readEntry.getPath(), values); } - return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder, filter); + return new HiveDrillNativeParquetRowGroupScan(getUserName(), hiveStoragePlugin, readEntries, columns, subPartitionHolder, confProperties, filter); } @Override diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java index 11d47f3..d631740 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -50,8 +50,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import static org.apache.drill.exec.store.hive.DrillHiveMetaStoreClient.createPartitionWithSpecColumns; @@ -64,6 +62,7 @@ public class HiveScan extends AbstractGroupScan { private final HiveStoragePlugin hiveStoragePlugin; private final HiveReadEntry hiveReadEntry; private final HiveMetadataProvider metadataProvider; + private final Map<String, String> confProperties; private List<List<LogicalInputSplit>> mappings; private List<LogicalInputSplit> inputSplits; @@ -75,22 +74,24 @@ public class HiveScan extends AbstractGroupScan { @JsonProperty("hiveReadEntry") final HiveReadEntry hiveReadEntry, @JsonProperty("hiveStoragePluginConfig") final HiveStoragePluginConfig hiveStoragePluginConfig, @JsonProperty("columns") final List<SchemaPath> columns, + @JsonProperty("confProperties") final Map<String, String> confProperties, @JacksonInject final StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { this(userName, hiveReadEntry, (HiveStoragePlugin) pluginRegistry.getPlugin(hiveStoragePluginConfig), columns, - null); + null, confProperties); } public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, final HiveStoragePlugin hiveStoragePlugin, - final List<SchemaPath> columns, final HiveMetadataProvider metadataProvider) throws ExecutionSetupException { + final List<SchemaPath> columns, final HiveMetadataProvider metadataProvider, final Map<String, String> confProperties) throws ExecutionSetupException { super(userName); this.hiveReadEntry = hiveReadEntry; this.columns = columns; this.hiveStoragePlugin = hiveStoragePlugin; + this.confProperties = confProperties; if (metadataProvider == null) { - this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry, hiveStoragePlugin.getHiveConf()); + this.metadataProvider = new HiveMetadataProvider(userName, hiveReadEntry, getHiveConf()); } else { this.metadataProvider = metadataProvider; } @@ -102,10 +103,11 @@ public class HiveScan extends AbstractGroupScan { this.hiveReadEntry = that.hiveReadEntry; this.hiveStoragePlugin = that.hiveStoragePlugin; this.metadataProvider = that.metadataProvider; + this.confProperties = that.confProperties; } public HiveScan clone(final HiveReadEntry hiveReadEntry) throws ExecutionSetupException { - return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, columns, metadataProvider); + return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, columns, metadataProvider, confProperties); } @JsonProperty @@ -123,29 +125,37 @@ public class HiveScan extends AbstractGroupScan { return columns; } + @JsonProperty + public Map<String, String> getConfProperties() { + return confProperties; + } + @JsonIgnore public HiveStoragePlugin getStoragePlugin() { return hiveStoragePlugin; } - protected HiveMetadataProvider getMetadataProvider() { - return metadataProvider; + @JsonIgnore + public HiveConf getHiveConf() { + return HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties); } - private List<LogicalInputSplit> getInputSplits() { - if (inputSplits == null) { - inputSplits = metadataProvider.getInputSplits(hiveReadEntry); - } - - return inputSplits; + @JsonIgnore + public boolean isNativeReader() { + return false; } + @Override + public boolean supportsPartitionFilterPushdown() { + List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys(); + return !(partitionKeys == null || partitionKeys.size() == 0); + } @Override public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> endpoints) { mappings = new ArrayList<>(); for (int i = 0; i < endpoints.size(); i++) { - mappings.add(new ArrayList<LogicalInputSplit>()); + mappings.add(new ArrayList<>()); } final int count = endpoints.size(); final List<LogicalInputSplit> inputSplits = getInputSplits(); @@ -158,9 +168,9 @@ public class HiveScan extends AbstractGroupScan { public SubScan getSpecificScan(final int minorFragmentId) throws ExecutionSetupException { try { final List<LogicalInputSplit> splits = mappings.get(minorFragmentId); - List<HivePartitionWrapper> parts = Lists.newArrayList(); - final List<List<String>> encodedInputSplits = Lists.newArrayList(); - final List<String> splitTypes = Lists.newArrayList(); + List<HivePartitionWrapper> parts = new ArrayList<>(); + final List<List<String>> encodedInputSplits = new ArrayList<>(); + final List<String> splitTypes = new ArrayList<>(); for (final LogicalInputSplit split : splits) { final Partition splitPartition = split.getPartition(); if (splitPartition != null) { @@ -176,7 +186,7 @@ public class HiveScan extends AbstractGroupScan { } final HiveReadEntry subEntry = new HiveReadEntry(hiveReadEntry.getTableWrapper(), parts); - return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, hiveStoragePlugin); + return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, splitTypes, columns, hiveStoragePlugin, confProperties); } catch (IOException | ReflectiveOperationException e) { throw new ExecutionSetupException(e); } @@ -192,7 +202,7 @@ public class HiveScan extends AbstractGroupScan { final Map<String, DrillbitEndpoint> endpointMap = new HashMap<>(); for (final DrillbitEndpoint endpoint : hiveStoragePlugin.getContext().getBits()) { endpointMap.put(endpoint.getAddress(), endpoint); - logger.debug("endpoing address: {}", endpoint.getAddress()); + logger.debug("Endpoint address: {}", endpoint.getAddress()); } final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new HashMap<>(); try { @@ -204,7 +214,7 @@ public class HiveScan extends AbstractGroupScan { for (final LogicalInputSplit split : inputSplits) { final float affinity = ((float) Math.max(1, split.getLength())) / totalSize; for (final String loc : split.getLocations()) { - logger.debug("split location: {}", loc); + logger.debug("Split location: {}", loc); final DrillbitEndpoint endpoint = endpointMap.get(loc); if (endpoint != null) { if (affinityMap.containsKey(endpoint)) { @@ -218,13 +228,8 @@ public class HiveScan extends AbstractGroupScan { } catch (final IOException e) { throw new DrillRuntimeException(e); } - for (final DrillbitEndpoint ep : affinityMap.keySet()) { - Preconditions.checkNotNull(ep); - } - for (final EndpointAffinity a : affinityMap.values()) { - Preconditions.checkNotNull(a.getEndpoint()); - } - return Lists.newArrayList(affinityMap.values()); + + return new ArrayList<>(affinityMap.values()); } @Override @@ -243,21 +248,8 @@ public class HiveScan extends AbstractGroupScan { } } - protected int getSerDeOverheadFactor() { - final int projectedColumnCount; - if (Utilities.isStarQuery(columns)) { - Table hiveTable = hiveReadEntry.getTable(); - projectedColumnCount = hiveTable.getSd().getColsSize() + hiveTable.getPartitionKeysSize(); - } else { - // In cost estimation, # of project columns should be >= 1, even for skipAll query. - projectedColumnCount = Math.max(columns.size(), 1); - } - - return projectedColumnCount * HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN; - } - @Override - public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) throws ExecutionSetupException { + public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> children) { return new HiveScan(this); } @@ -275,6 +267,7 @@ public class HiveScan extends AbstractGroupScan { + ", numPartitions=" + numPartitions + ", partitions= " + partitions + ", inputDirectories=" + metadataProvider.getInputDirectories(hiveReadEntry) + + ", confProperties=" + confProperties + "]"; } @@ -290,22 +283,24 @@ public class HiveScan extends AbstractGroupScan { return true; } - // Return true if the current table is partitioned false otherwise - public boolean supportsPartitionFilterPushdown() { - final List<FieldSchema> partitionKeys = hiveReadEntry.getTable().getPartitionKeys(); - if (partitionKeys == null || partitionKeys.size() == 0) { - return false; + private List<LogicalInputSplit> getInputSplits() { + if (inputSplits == null) { + inputSplits = metadataProvider.getInputSplits(hiveReadEntry); } - return true; - } - @JsonIgnore - public HiveConf getHiveConf() { - return hiveStoragePlugin.getHiveConf(); + return inputSplits; } - @JsonIgnore - public boolean isNativeReader() { - return false; + private int getSerDeOverheadFactor() { + final int projectedColumnCount; + if (Utilities.isStarQuery(columns)) { + Table hiveTable = hiveReadEntry.getTable(); + projectedColumnCount = hiveTable.getSd().getColsSize() + hiveTable.getPartitionKeysSize(); + } else { + // In cost estimation, # of project columns should be >= 1, even for skipAll query. + projectedColumnCount = Math.max(columns.size(), 1); + } + + return projectedColumnCount * HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN; } } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java index ced8b01..adf1348 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java @@ -18,27 +18,34 @@ package org.apache.drill.exec.store.hive; import java.io.IOException; +import java.io.StringReader; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; +import java.util.Properties; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import com.google.common.collect.ImmutableSet; import org.apache.calcite.schema.Schema.TableType; import org.apache.calcite.schema.SchemaPlus; +import org.apache.commons.lang3.StringEscapeUtils; import org.apache.drill.common.JSONOptions; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.OptimizerRulesContext; +import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.planner.sql.logical.ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan; import org.apache.drill.exec.planner.sql.logical.ConvertHiveParquetScanToDrillParquetScan; import org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.server.options.SessionOptionManager; import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.store.StoragePluginOptimizerRule; @@ -53,17 +60,16 @@ import org.apache.thrift.transport.TTransportException; public class HiveStoragePlugin extends AbstractStoragePlugin { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveStoragePlugin.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveStoragePlugin.class); private final HiveStoragePluginConfig config; private HiveSchemaFactory schemaFactory; private final HiveConf hiveConf; - public HiveStoragePlugin(HiveStoragePluginConfig config, DrillbitContext context, String name) - throws ExecutionSetupException { + public HiveStoragePlugin(HiveStoragePluginConfig config, DrillbitContext context, String name) throws ExecutionSetupException { super(context, name); this.config = config; - this.hiveConf = createHiveConf(config.getHiveConfigOverride()); + this.hiveConf = HiveUtilities.generateHiveConf(config.getConfigProps()); this.schemaFactory = new HiveSchemaFactory(this, name, hiveConf); } @@ -76,7 +82,17 @@ public class HiveStoragePlugin extends AbstractStoragePlugin { } @Override + public HiveScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException { + return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, options); + } + + @Override public HiveScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException { + return getPhysicalScan(userName, selection, columns, null); + } + + @Override + public HiveScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException { HiveReadEntry hiveReadEntry = selection.getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){}); try { if (hiveReadEntry.getJdbcTableType() == TableType.VIEW) { @@ -84,7 +100,26 @@ public class HiveStoragePlugin extends AbstractStoragePlugin { "Querying views created in Hive from Drill is not supported in current version."); } - return new HiveScan(userName, hiveReadEntry, this, columns, null); + Map<String, String> confProperties = new HashMap<>(); + if (options != null) { + String value = StringEscapeUtils.unescapeJava(options.getString(ExecConstants.HIVE_CONF_PROPERTIES)); + logger.trace("[{}] is set to {}.", ExecConstants.HIVE_CONF_PROPERTIES, value); + try { + Properties properties = new Properties(); + properties.load(new StringReader(value)); + confProperties = + properties.stringPropertyNames().stream() + .collect( + Collectors.toMap( + Function.identity(), + properties::getProperty, + (o, n) -> n)); + } catch (IOException e) { + logger.warn("Unable to parse Hive conf properties {}, ignoring them.", value); + } + } + + return new HiveScan(userName, hiveReadEntry, this, columns, null, confProperties); } catch (ExecutionSetupException e) { throw new IOException(e); } @@ -181,14 +216,4 @@ public class HiveStoragePlugin extends AbstractStoragePlugin { return ruleBuilder.build(); } - private static HiveConf createHiveConf(final Map<String, String> hiveConfigOverride) { - final HiveConf hiveConf = new HiveConf(); - for(Entry<String, String> config : hiveConfigOverride.entrySet()) { - final String key = config.getKey(); - final String value = config.getValue(); - hiveConf.set(key, value); - logger.trace("HiveConfig Override {}={}", key, value); - } - return hiveConf; - } } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java index b6f15c8..d812468 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java @@ -19,28 +19,31 @@ package org.apache.drill.exec.store.hive; import java.util.Map; +import com.fasterxml.jackson.annotation.JsonAlias; import org.apache.drill.common.logical.StoragePluginConfigBase; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; @JsonTypeName(HiveStoragePluginConfig.NAME) public class HiveStoragePluginConfig extends StoragePluginConfigBase { - @JsonProperty - public Map<String, String> configProps; public static final String NAME = "hive"; - @JsonIgnore - public Map<String, String> getHiveConfigOverride() { - return configProps; - } + private final Map<String, String> configProps; @JsonCreator - public HiveStoragePluginConfig(@JsonProperty("config") Map<String, String> props) { - this.configProps = props; + public HiveStoragePluginConfig(@JsonProperty("configProps") + // previously two names were allowed due to incorrectly written ser / der logic + // allowing to use both during deserialization for backward compatibility + @JsonAlias("config") Map<String, String> configProps) { + this.configProps = configProps; + } + + @JsonProperty + public Map<String, String> getConfigProps() { + return configProps; } @Override diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java index 8ca8647..0acec2d 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java @@ -22,6 +22,7 @@ import java.lang.reflect.Constructor; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Map; import com.fasterxml.jackson.annotation.JacksonInject; import com.google.common.collect.ImmutableSet; @@ -55,6 +56,7 @@ public class HiveSubScan extends AbstractBase implements SubScan { private final HiveTableWithColumnCache table; private final List<HivePartition> partitions; private final List<SchemaPath> columns; + private final Map<String, String> confProperties; @JsonCreator public HiveSubScan(@JacksonInject StoragePluginRegistry registry, @@ -63,22 +65,24 @@ public class HiveSubScan extends AbstractBase implements SubScan { @JsonProperty("hiveReadEntry") HiveReadEntry hiveReadEntry, @JsonProperty("splitClasses") List<String> splitClasses, @JsonProperty("columns") List<SchemaPath> columns, - @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig) + @JsonProperty("hiveStoragePluginConfig") HiveStoragePluginConfig hiveStoragePluginConfig, + @JsonProperty("confProperties") Map<String, String> confProperties) throws IOException, ExecutionSetupException, ReflectiveOperationException { this(userName, splits, hiveReadEntry, splitClasses, columns, - (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig)); + (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig), confProperties); } public HiveSubScan(final String userName, final List<List<String>> splits, final HiveReadEntry hiveReadEntry, - final List<String> splitClasses, + final List<String> splitClasses, final List<SchemaPath> columns, - final HiveStoragePlugin hiveStoragePlugin) + final HiveStoragePlugin hiveStoragePlugin, + final Map<String, String> confProperties) throws IOException, ReflectiveOperationException { super(userName); this.hiveReadEntry = hiveReadEntry; @@ -88,6 +92,7 @@ public class HiveSubScan extends AbstractBase implements SubScan { this.splitClasses = splitClasses; this.columns = columns; this.hiveStoragePlugin = hiveStoragePlugin; + this.confProperties = confProperties; for (int i = 0; i < splits.size(); i++) { inputSplits.add(deserializeInputSplit(splits.get(i), splitClasses.get(i))); @@ -119,6 +124,11 @@ public class HiveSubScan extends AbstractBase implements SubScan { return hiveStoragePlugin.getConfig(); } + @JsonProperty + public Map<String, String> getConfProperties() { + return confProperties; + } + @JsonIgnore public HiveTableWithColumnCache getTable() { return table; @@ -141,7 +151,7 @@ public class HiveSubScan extends AbstractBase implements SubScan { @JsonIgnore public HiveConf getHiveConf() { - return hiveStoragePlugin.getHiveConf(); + return HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties); } @Override @@ -152,7 +162,7 @@ public class HiveSubScan extends AbstractBase implements SubScan { @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { try { - return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, hiveStoragePlugin); + return new HiveSubScan(getUserName(), splits, hiveReadEntry, splitClasses, columns, hiveStoragePlugin, confProperties); } catch (IOException | ReflectiveOperationException e) { throw new ExecutionSetupException(e); } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java index c8efb65..6fc567e 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java @@ -93,7 +93,14 @@ import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_ public class HiveUtilities { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HiveUtilities.class); - /** Partition value is received in string format. Convert it into appropriate object based on the type. */ + /** + * Partition value is received in string format. Convert it into appropriate object based on the type. + * + * @param typeInfo type info + * @param value partition values + * @param defaultPartitionValue default partition value + * @return converted object + */ public static Object convertPartitionType(TypeInfo typeInfo, String value, final String defaultPartitionValue) { if (typeInfo.getCategory() != Category.PRIMITIVE) { // In Hive only primitive types are allowed as partition column types. @@ -147,6 +154,15 @@ public class HiveUtilities { return null; } + /** + * Populates vector with given value based on its type. + * + * @param vector vector instance + * @param managedBuffer Drill duffer + * @param val value + * @param start start position + * @param end end position + */ public static void populateVector(final ValueVector vector, final DrillBuf managedBuffer, final Object val, final int start, final int end) { TypeProtos.MinorType type = vector.getField().getType().getMinorType(); @@ -307,6 +323,13 @@ public class HiveUtilities { } } + /** + * Obtains major type from given type info holder. + * + * @param typeInfo type info holder + * @param options session options + * @return appropriate major type, null otherwise. For some types may throw unsupported exception. + */ public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo typeInfo, final OptionSet options) { switch (typeInfo.getCategory()) { case PRIMITIVE: { @@ -343,8 +366,14 @@ public class HiveUtilities { return null; } - public static TypeProtos.MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo, - OptionSet options) { + /** + * Obtains minor type from given primitive type info holder. + * + * @param primitiveTypeInfo primitive type info holder + * @param options session options + * @return appropriate minor type, otherwise throws unsupported type exception + */ + public static TypeProtos.MinorType getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo, OptionSet options) { switch(primitiveTypeInfo.getPrimitiveCategory()) { case BINARY: return TypeProtos.MinorType.VARBINARY; @@ -392,10 +421,8 @@ public class HiveUtilities { * @param job {@link JobConf} instance needed incase the table is StorageHandler based table. * @param sd {@link StorageDescriptor} instance of currently reading partition or table (for non-partitioned tables). * @param table Table object - * @throws Exception */ - public static Class<? extends InputFormat<?, ?>> getInputFormatClass(final JobConf job, final StorageDescriptor sd, - final Table table) throws Exception { + public static Class<? extends InputFormat<?, ?>> getInputFormatClass(final JobConf job, final StorageDescriptor sd, final Table table) throws Exception { final String inputFormatName = sd.getInputFormat(); if (Strings.isNullOrEmpty(inputFormatName)) { final String storageHandlerClass = table.getParameters().get(META_TABLE_STORAGE); @@ -426,26 +453,23 @@ public class HiveUtilities { } /** - * Wrapper around {@link MetaStoreUtils#getPartitionMetadata(Partition, Table)} which also adds parameters from table - * to properties returned by {@link MetaStoreUtils#getPartitionMetadata(Partition, Table)}. + * Wrapper around {@link MetaStoreUtils#getPartitionMetadata(org.apache.hadoop.hive.metastore.api.Partition, Table)} + * which also adds parameters from table to properties returned by that method. * * @param partition the source of partition level parameters * @param table the source of table level parameters * @return properties */ public static Properties getPartitionMetadata(final HivePartition partition, final HiveTableWithColumnCache table) { - final Properties properties; restoreColumns(table, partition); - properties = MetaStoreUtils.getPartitionMetadata(partition, table); + Properties properties = MetaStoreUtils.getPartitionMetadata(partition, table); // SerDe expects properties from Table, but above call doesn't add Table properties. // Include Table properties in final list in order to not to break SerDes that depend on // Table properties. For example AvroSerDe gets the schema from properties (passed as second argument) - for (Map.Entry<String, String> entry : table.getParameters().entrySet()) { - if (entry.getKey() != null && entry.getKey() != null) { - properties.put(entry.getKey(), entry.getValue()); - } - } + table.getParameters().entrySet().stream() + .filter(e -> e.getKey() != null && e.getValue() != null) + .forEach(e -> properties.put(e.getKey(), e.getValue())); return properties; } @@ -453,8 +477,8 @@ public class HiveUtilities { /** * Sets columns from table cache to table and partition. * + * @param table the source of column lists cache * @param partition partition which will set column list - * @param table the source of column lists cache */ public static void restoreColumns(HiveTableWithColumnCache table, HivePartition partition) { // exactly the same column lists for partitions or table @@ -471,6 +495,9 @@ public class HiveUtilities { * Wrapper around {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)} * which also sets columns from table cache to table and returns properties returned by * {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, Map, String, String, List)}. + * + * @param table Hive table with cached columns + * @return Hive table metadata */ public static Properties getTableMetadata(HiveTableWithColumnCache table) { restoreColumns(table, null); @@ -478,13 +505,18 @@ public class HiveUtilities { table.getDbName(), table.getTableName(), table.getPartitionKeys()); } + /** + * Generates unsupported types exception message with list of supported types + * and throws user exception. + * + * @param unsupportedType unsupported type + */ public static void throwUnsupportedHiveDataTypeError(String unsupportedType) { - StringBuilder errMsg = new StringBuilder(); - errMsg.append(String.format("Unsupported Hive data type %s. ", unsupportedType)); - errMsg.append(System.getProperty("line.separator")); - errMsg.append("Following Hive data types are supported in Drill for querying: "); - errMsg.append( - "BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DATE, TIMESTAMP, BINARY, DECIMAL, STRING, VARCHAR and CHAR"); + StringBuilder errMsg = new StringBuilder() + .append("Unsupported Hive data type ").append(unsupportedType).append(". ") + .append(System.lineSeparator()) + .append("Following Hive data types are supported in Drill for querying: ") + .append("BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DATE, TIMESTAMP, BINARY, DECIMAL, STRING, VARCHAR and CHAR"); throw UserException.unsupportedError() .message(errMsg.toString()) @@ -633,7 +665,7 @@ public class HiveUtilities { } /** - * Get the input format from given {@link StorageDescriptor} + * Get the input format from given {@link StorageDescriptor}. * * @param properties table properties * @param hiveReadEntry hive read entry @@ -681,5 +713,35 @@ public class HiveUtilities { } return false; } + + /** + * Creates HiveConf based on given list of configuration properties. + * + * @param properties config properties + * @return instance of HiveConf + */ + public static HiveConf generateHiveConf(Map<String, String> properties) { + logger.trace("Override HiveConf with the following properties {}", properties); + HiveConf hiveConf = new HiveConf(); + properties.forEach(hiveConf::set); + return hiveConf; + } + + /** + * Creates HiveConf based on properties in given HiveConf and configuration properties. + * + * @param hiveConf hive conf + * @param properties config properties + * @return instance of HiveConf + */ + public static HiveConf generateHiveConf(HiveConf hiveConf, Map<String, String> properties) { + Properties changedProperties = hiveConf.getChangedProperties(); + changedProperties.putAll(properties); + HiveConf newHiveConf = new HiveConf(); + changedProperties.stringPropertyNames() + .forEach(name -> newHiveConf.set(name, changedProperties.getProperty(name))); + return newHiveConf; + } + } diff --git a/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json index d06220f..018189c 100644 --- a/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json +++ b/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json @@ -2,7 +2,7 @@ "storage":{ hive : { type:"hive", - config : { + configProps : { "hive.metastore.uris" : "", "javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=../sample-data/drill_hive_db;create=true", "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh", diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java index 556deb2..ea8d5df 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java @@ -152,6 +152,12 @@ public class TestHiveDrillNativeParquetReader extends HiveTestBase { // checks only group scan PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv_native"); PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv_native_ext"); + try { + alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true"); + PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.sub_dir_table"); + } finally { + resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES); + } } @Test @@ -243,4 +249,15 @@ public class TestHiveDrillNativeParquetReader extends HiveTestBase { testPlanMatchingPatterns(query, new String[] {"HiveScan"}, new String[]{"HiveDrillNativeParquetScan"}); } + @Test + public void testHiveConfPropertiesAtSessionLevel() throws Exception { + String query = "select * from hive.sub_dir_table"; + try { + alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true"); + test(query); + } finally { + resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES); + } + } + } diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java index 25393e7..94f39b8 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java @@ -412,6 +412,27 @@ public class TestHiveStorage extends HiveTestBase { public void testPhysicalPlanSubmission() throws Exception { PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv"); PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.readtest"); + try { + alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true"); + PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.sub_dir_table"); + } finally { + resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES); + } + } + + @Test + public void testHiveConfPropertiesAtSessionLevel() throws Exception { + String query = "select * from hive.sub_dir_table"; + try { + alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true"); + test(query); + } finally { + resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES); + } + + thrown.expect(UserRemoteException.class); + thrown.expectMessage(containsString("IOException: Not a file")); + test(query); } private void verifyColumnsMetadata(List<UserProtos.ResultColumnMetadata> columnsList, Map<String, Integer> expectedResult) { diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java index 80da976..c5c0d48 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java @@ -50,6 +50,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase { .baselineValues("hive.default", "partition_with_few_schemas") .baselineValues("hive.default", "kv_native") .baselineValues("hive.default", "kv_native_ext") + .baselineValues("hive.default", "sub_dir_table") .go(); testBuilder() @@ -254,6 +255,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase { .baselineValues("DRILL", "hive.default", "simple_json", "TABLE") .baselineValues("DRILL", "hive.default", "kv_native", "TABLE") .baselineValues("DRILL", "hive.default", "kv_native_ext", "TABLE") + .baselineValues("DRILL", "hive.default", "sub_dir_table", "TABLE") .baselineValues("DRILL", "hive.skipper", "kv_text_small", "TABLE") .baselineValues("DRILL", "hive.skipper", "kv_text_large", "TABLE") .baselineValues("DRILL", "hive.skipper", "kv_incorrect_skip_header", "TABLE") diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java index f206999..074cb3b 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java @@ -25,6 +25,7 @@ import java.nio.file.Paths; import java.nio.file.attribute.PosixFilePermission; import java.sql.Date; import java.sql.Timestamp; +import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -42,7 +43,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; -import com.google.common.collect.Maps; import org.apache.hadoop.hive.serde.serdeConstants; import static org.apache.drill.exec.hive.HiveTestUtilities.executeQuery; @@ -80,7 +80,7 @@ public class HiveTestDataGenerator { this.whDir = whDir; this.dirTestWatcher = dirTestWatcher; - config = Maps.newHashMap(); + config = new HashMap<>(); config.put(ConfVars.METASTOREURIS.toString(), ""); config.put("javax.jdo.option.ConnectionURL", String.format("jdbc:derby:;databaseName=%s;create=true", dbDir)); config.put("hive.metastore.warehouse.dir", whDir); @@ -89,7 +89,9 @@ public class HiveTestDataGenerator { /** * Add Hive test storage plugin to the given plugin registry. - * @throws Exception + * + * @param pluginRegistry storage plugin registry + * @throws Exception in case if unable to update Hive storage plugin */ public void addHiveTestPlugin(final StoragePluginRegistry pluginRegistry) throws Exception { HiveStoragePluginConfig pluginConfig = new HiveStoragePluginConfig(config); @@ -101,7 +103,8 @@ public class HiveTestDataGenerator { /** * Update the current HiveStoragePlugin in given plugin registry with given <i>configOverride</i>. * - * @param configOverride + * @param pluginRegistry storage plugin registry + * @param configOverride config properties to be overridden * @throws DrillException if fails to update or no Hive plugin currently exists in given plugin registry. */ public void updatePluginConfig(final StoragePluginRegistry pluginRegistry, Map<String, String> configOverride) @@ -113,7 +116,7 @@ public class HiveTestDataGenerator { } HiveStoragePluginConfig newPluginConfig = storagePlugin.getConfig(); - newPluginConfig.getHiveConfigOverride().putAll(configOverride); + newPluginConfig.getConfigProps().putAll(configOverride); pluginRegistry.createOrUpdate(HIVE_TEST_PLUGIN_NAME, newPluginConfig, true); } @@ -344,7 +347,7 @@ public class HiveTestDataGenerator { "charType CHAR(10))" ); - /** + /* * Create a PARQUET table with all supported types. */ executeQuery(hiveDriver, @@ -542,6 +545,8 @@ public class HiveTestDataGenerator { createTestDataForDrillNativeParquetReaderTests(hiveDriver); + createSubDirTable(hiveDriver, testDataFile); + ss.close(); } @@ -594,56 +599,61 @@ public class HiveTestDataGenerator { "location '%s'", thirdPartition)); } + private void createSubDirTable(Driver hiveDriver, String testDataFile) { + String tableName = "sub_dir_table"; + dirTestWatcher.copyResourceToRoot(Paths.get(testDataFile), Paths.get(tableName, "sub_dir", "data.txt")); + + String tableLocation = Paths.get(dirTestWatcher.getRootDir().toURI().getPath(), tableName).toUri().getPath(); + + String tableDDL = String.format("create external table sub_dir_table (key int, value string) " + + "row format delimited fields terminated by ',' stored as textfile location '%s'", tableLocation); + executeQuery(hiveDriver, tableDDL); + } + private File getTempFile() throws Exception { return java.nio.file.Files.createTempFile("drill-hive-test", ".txt").toFile(); } private String generateTestDataFile() throws Exception { - final File file = getTempFile(); - PrintWriter printWriter = new PrintWriter(file); - for (int i=1; i<=5; i++) { - printWriter.println (String.format("%d, key_%d", i, i)); + File file = getTempFile(); + try (PrintWriter printWriter = new PrintWriter(file)) { + for (int i = 1; i <= 5; i++) { + printWriter.println(String.format("%d, key_%d", i, i)); + } } - printWriter.close(); - return file.getPath(); } private String generateTestDataFileForPartitionInput() throws Exception { - final File file = getTempFile(); - - PrintWriter printWriter = new PrintWriter(file); - - String partValues[] = {"1", "2", "null"}; - - for(int c = 0; c < partValues.length; c++) { - for(int d = 0; d < partValues.length; d++) { - for(int e = 0; e < partValues.length; e++) { - for (int i = 1; i <= 5; i++) { - Date date = new Date(System.currentTimeMillis()); - Timestamp ts = new Timestamp(System.currentTimeMillis()); - printWriter.printf("%s,%s,%s,%s,%s", - date.toString(), ts.toString(), partValues[c], partValues[d], partValues[e]); - printWriter.println(); + File file = getTempFile(); + try (PrintWriter printWriter = new PrintWriter(file)) { + String partValues[] = {"1", "2", "null"}; + for (String partValue : partValues) { + for (String partValue1 : partValues) { + for (String partValue2 : partValues) { + for (int i = 1; i <= 5; i++) { + Date date = new Date(System.currentTimeMillis()); + Timestamp ts = new Timestamp(System.currentTimeMillis()); + printWriter.printf("%s,%s,%s,%s,%s", date.toString(), ts.toString(), partValue, partValue1, partValue2); + printWriter.println(); + } } } } } - printWriter.close(); - return file.getPath(); } private String generateAllTypesDataFile() throws Exception { File file = getTempFile(); - PrintWriter printWriter = new PrintWriter(file); - printWriter.println("YmluYXJ5ZmllbGQ=,false,34,65.99,2347.923,2758725827.9999,29375892739852.7689," + - "89853749534593985.7834783,8.345,4.67,123456,234235,3455,stringfield,varcharfield," + - "2013-07-05 17:01:00,2013-07-05,charfield"); - printWriter.println(",,,,,,,,,,,,,,,,"); - printWriter.close(); + try (PrintWriter printWriter = new PrintWriter(file)) { + printWriter.println("YmluYXJ5ZmllbGQ=,false,34,65.99,2347.923,2758725827.9999,29375892739852.7689,"+ + "89853749534593985.7834783,8.345,4.67,123456,234235,3455,stringfield,varcharfield,"+ + "2013-07-05 17:01:00,2013-07-05,charfield"); + printWriter.println(",,,,,,,,,,,,,,,,"); + } return file.getPath(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 49f149b..4c840a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -410,6 +410,9 @@ public final class ExecConstants { public static final OptionValidator HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR = new BooleanValidator(HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER); + public static final String HIVE_CONF_PROPERTIES = "store.hive.conf.properties"; + public static final OptionValidator HIVE_CONF_PROPERTIES_VALIDATOR = new StringValidator(HIVE_CONF_PROPERTIES); + public static final String SLICE_TARGET = "planner.slice_target"; public static final long SLICE_TARGET_DEFAULT = 100000l; public static final PositiveLongValidator SLICE_TARGET_OPTION = new PositiveLongValidator(SLICE_TARGET, Long.MAX_VALUE); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java index 36e74a0..e7518b8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.opt; -import com.google.common.collect.Lists; - import org.apache.drill.common.JSONOptions; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -28,9 +26,7 @@ import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.common.logical.data.Filter; import org.apache.drill.common.logical.data.GroupingAggregate; import org.apache.drill.common.logical.data.Join; -import org.apache.drill.common.logical.data.JoinCondition; import org.apache.drill.common.logical.data.LogicalOperator; -import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.logical.data.Order; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.common.logical.data.Project; @@ -53,6 +49,7 @@ import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.config.StreamingAggregate; +import org.apache.drill.exec.physical.config.UnnestPOP; import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.rpc.UserClientConnection; import org.apache.drill.exec.server.options.OptionManager; @@ -64,6 +61,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.stream.Collectors; public class BasicOptimizer extends Optimizer { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class); @@ -99,8 +97,7 @@ public class BasicOptimizer extends Optimizer { .version(logicalProperties.version) .generator(logicalProperties.generator) .options(new JSONOptions(context.getOptions().getOptionList())).build(); - final PhysicalPlan p = new PhysicalPlan(props, physOps); - return p; + return new PhysicalPlan(props, physOps); } public static class BasicOptimizationContext implements OptimizationContext { @@ -128,30 +125,28 @@ public class BasicOptimizer extends Optimizer { */ private final LogicalPlan logicalPlan; - public LogicalConverter(final LogicalPlan logicalPlan) { + LogicalConverter(final LogicalPlan logicalPlan) { this.logicalPlan = logicalPlan; } @Override public PhysicalOperator visitGroupingAggregate(GroupingAggregate groupBy, Object value) throws OptimizerException { - final List<Ordering> orderDefs = Lists.newArrayList(); PhysicalOperator input = groupBy.getInput().accept(this, value); if (groupBy.getKeys().size() > 0) { - for(NamedExpression e : groupBy.getKeys()) { - orderDefs.add(new Ordering(Direction.ASCENDING, e.getExpr(), NullDirection.FIRST)); - } + List<Ordering> orderDefs = groupBy.getKeys().stream() + .map(e -> new Ordering(Direction.ASCENDING, e.getExpr(), NullDirection.FIRST)) + .collect(Collectors.toList()); input = new Sort(input, orderDefs, false); } - final StreamingAggregate sa = new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f); - return sa; + return new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f); } @Override public PhysicalOperator visitWindow(final Window window, final Object value) throws OptimizerException { PhysicalOperator input = window.getInput().accept(this, value); - final List<Ordering> ods = Lists.newArrayList(); + final List<Ordering> ods = new ArrayList<>(); input = new Sort(input, ods, false); @@ -162,11 +157,7 @@ public class BasicOptimizer extends Optimizer { @Override public PhysicalOperator visitOrder(final Order order, final Object value) throws OptimizerException { final PhysicalOperator input = order.getInput().accept(this, value); - final List<Ordering> ods = Lists.newArrayList(); - for (Ordering o : order.getOrderings()){ - ods.add(o); - } - + final List<Ordering> ods = new ArrayList<>(order.getOrderings()); return new SelectionVectorRemover(new Sort(input, ods, false)); } @@ -180,18 +171,20 @@ public class BasicOptimizer extends Optimizer { @Override public PhysicalOperator visitJoin(final Join join, final Object value) throws OptimizerException { PhysicalOperator leftOp = join.getLeft().accept(this, value); - final List<Ordering> leftOrderDefs = Lists.newArrayList(); - for(JoinCondition jc : join.getConditions()){ - leftOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getLeft())); - } + + List<Ordering> leftOrderDefs = join.getConditions().stream() + .map(jc -> new Ordering(Direction.ASCENDING, jc.getLeft())) + .collect(Collectors.toList()); + leftOp = new Sort(leftOp, leftOrderDefs, false); leftOp = new SelectionVectorRemover(leftOp); PhysicalOperator rightOp = join.getRight().accept(this, value); - final List<Ordering> rightOrderDefs = Lists.newArrayList(); - for(JoinCondition jc : join.getConditions()){ - rightOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getRight())); - } + + List<Ordering> rightOrderDefs = join.getConditions().stream() + .map(jc -> new Ordering(Direction.ASCENDING, jc.getRight())) + .collect(Collectors.toList()); + rightOp = new Sort(rightOp, rightOrderDefs, false); rightOp = new SelectionVectorRemover(rightOp); @@ -210,7 +203,7 @@ public class BasicOptimizer extends Optimizer { try { final StoragePlugin storagePlugin = queryContext.getStorage().getPlugin(config); final String user = userSession.getSession().getCredentials().getUserName(); - return storagePlugin.getPhysicalScan(user, scan.getSelection()); + return storagePlugin.getPhysicalScan(user, scan.getSelection(), userSession.getSession().getOptions()); } catch (IOException | ExecutionSetupException e) { throw new OptimizerException("Failure while attempting to retrieve storage engine.", e); } @@ -241,8 +234,8 @@ public class BasicOptimizer extends Optimizer { } @Override - public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj) throws OptimizerException { - return new org.apache.drill.exec.physical.config.UnnestPOP(null, unnest.getColumn()); + public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj) { + return new UnnestPOP(null, unnest.getColumn()); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java index 4ee7671..53036f1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java @@ -32,6 +32,7 @@ import org.apache.drill.common.JSONOptions; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.base.SchemalessScan; import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.server.options.SessionOptionManager; import org.apache.drill.exec.store.StoragePlugin; import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.util.ImpersonationUtil; @@ -45,6 +46,7 @@ public abstract class DrillTable implements Table { private final StoragePlugin plugin; private final String userName; private GroupScan scan; + private SessionOptionManager options; /** * Creates a DrillTable instance for a @{code TableType#Table} table. @@ -85,12 +87,16 @@ public abstract class DrillTable implements Table { this(storageEngineName, plugin, ImpersonationUtil.getProcessUserName(), selection); } + public void setOptions(SessionOptionManager options) { + this.options = options; + } + public GroupScan getGroupScan() throws IOException{ if (scan == null) { if (selection instanceof FileSelection && ((FileSelection) selection).isEmptyDirectory()) { this.scan = new SchemalessScan(userName, ((FileSelection) selection).getSelectionRoot()); } else { - this.scan = plugin.getPhysicalScan(userName, new JSONOptions(selection)); + this.scan = plugin.getPhysicalScan(userName, new JSONOptions(selection), options); } } return scan; @@ -138,7 +144,8 @@ public abstract class DrillTable implements Table { return true; } - @Override public boolean isRolledUp(String column) { + @Override + public boolean isRolledUp(String column) { return false; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java index 3f65ad2..d6b0951 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java @@ -78,6 +78,7 @@ import org.apache.drill.exec.ops.UdfUtilities; import org.apache.drill.exec.planner.cost.DrillCostBase; import org.apache.drill.exec.planner.logical.DrillConstExecutor; import org.apache.drill.exec.planner.logical.DrillRelFactories; +import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.rpc.user.UserSession; @@ -113,7 +114,6 @@ public class SqlConverter { private final DrillConfig drillConfig; private RelOptCluster cluster; - private String sql; private VolcanoPlanner planner; private boolean useRootSchema = false; @@ -187,12 +187,10 @@ public class SqlConverter { public SqlNode validate(final SqlNode parsedNode) { try { - SqlNode validatedNode = validator.validate(parsedNode); - return validatedNode; + return validator.validate(parsedNode); } catch (RuntimeException e) { UserException.Builder builder = UserException - .validationError(e) - .addContext("SQL Query", sql); + .validationError(e); if (isInnerQuery) { builder.message("Failure validating a view your query is dependent upon."); } @@ -240,7 +238,7 @@ public class SqlConverter { private class DrillValidator extends SqlValidatorImpl { - protected DrillValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader, + DrillValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader catalogReader, RelDataTypeFactory typeFactory, SqlConformance conformance) { super(opTab, catalogReader, typeFactory, conformance); } @@ -382,15 +380,11 @@ public class SqlConverter { //To avoid unexpected column errors set a value of top to false final RelRoot rel = sqlToRelConverter.convertQuery(validatedNode, false, false); - final RelRoot rel2 = rel.withRel(sqlToRelConverter.flattenTypes(rel.rel, true)); - return rel2; + return rel.withRel(sqlToRelConverter.flattenTypes(rel.rel, true)); } private class Expander implements RelOptTable.ViewExpander { - public Expander() { - } - @Override public RelRoot expandView(RelDataType rowType, String queryString, List<String> schemaPath, List<String> viewPath) { final DrillCalciteCatalogReader catalogReader = new DrillCalciteCatalogReader( @@ -550,7 +544,7 @@ public class SqlConverter { * also preserving nullability. * * <p>Tries to expand the cast, and therefore the result may be something - * other than a {@link RexCall} to the CAST operator, such as a + * other than a {@link org.apache.calcite.rex.RexCall} to the CAST operator, such as a * {@link RexLiteral} if {@code matchNullability} is false. * * @param type Type to cast to @@ -611,7 +605,7 @@ public class SqlConverter { /** * Disallow temporary tables presence in sql statement (ex: in view definitions) */ - public void disallowTemporaryTables() { + void disallowTemporaryTables() { this.allowTemporaryTables = false; } @@ -647,13 +641,19 @@ public class SqlConverter { } } - return super.getTable(names); + Prepare.PreparingTable table = super.getTable(names); + DrillTable unwrap; + // add session options if found table is Drill table + if (table != null && (unwrap = table.unwrap(DrillTable.class)) != null) { + unwrap.setOptions(session.getOptions()); + } + return table; } @Override public List<List<String>> getSchemaPaths() { if (useRootSchema) { - return ImmutableList.<List<String>>of(ImmutableList.<String>of()); + return ImmutableList.of(ImmutableList.of()); } return super.getSchemaPaths(); } @@ -662,8 +662,8 @@ public class SqlConverter { * check if the schema provided is a valid schema: * <li>schema is not indicated (only one element in the names list)<li/> * - * @param names list of schema and table names, table name is always the last element - * @return throws a userexception if the schema is not valid. + * @param names list of schema and table names, table name is always the last element + * @throws UserException if the schema is not valid. */ private void isValidSchema(final List<String> names) throws UserException { SchemaPlus defaultSchema = session.getDefaultSchema(this.rootSchema); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index a9c4742..a16bb4d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -17,11 +17,14 @@ */ package org.apache.drill.exec.server.options; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.commons.collections.IteratorUtils; import org.apache.drill.common.config.DrillConfig; @@ -40,15 +43,13 @@ import org.apache.drill.exec.util.AssertionUtil; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - -/** - * {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}. - * Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and - * persist between restarts. - */ /** + * <p> {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}. + * Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and + * persist between restarts. + * </p> + * * <p> All the system options are externalized into conf file. While adding a new system option * a validator should be added and the default value for the option should be set in * the conf files(example : drill-module.conf) under the namespace drill.exec.options. @@ -173,6 +174,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR), new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER_VALIDATOR), new OptionDefinition(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR), + new OptionDefinition(ExecConstants.HIVE_CONF_PROPERTIES_VALIDATOR), new OptionDefinition(ExecConstants.SLICE_TARGET_OPTION), new OptionDefinition(ExecConstants.AFFINITY_FACTOR), new OptionDefinition(ExecConstants.MAX_WIDTH_GLOBAL), @@ -237,11 +239,13 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)), }; - final CaseInsensitiveMap<OptionDefinition> map = CaseInsensitiveMap.newHashMap(); + CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions) + .collect(Collectors.toMap( + d -> d.getValidator().getOptionName(), + Function.identity(), + (o, n) -> n, + CaseInsensitiveMap::newHashMap)); - for (final OptionDefinition definition: definitions) { - map.put(definition.getValidator().getOptionName(), definition); - } if (AssertionUtil.isAssertionsEnabled()) { map.put(ExecConstants.DRILLBIT_CONTROL_INJECTIONS, new OptionDefinition(ExecConstants.DRILLBIT_CONTROLS_VALIDATOR)); @@ -295,7 +299,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea * Initializes this option manager. * * @return this option manager - * @throws Exception + * @throws Exception if unable to initialize option manager */ public SystemOptionManager init() throws Exception { options = provider.getOrCreateStore(config); @@ -395,16 +399,13 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea @Override public void deleteAllLocalOptions() { - final Set<String> names = Sets.newHashSet(); - for (final Map.Entry<String, PersistedOptionValue> entry : Lists.newArrayList(options.getAll())) { - names.add(entry.getKey()); - } - for (final String name : names) { - options.delete(name); // should be lowercase - } + Iterable<Map.Entry<String, PersistedOptionValue>> allOptions = () -> options.getAll(); + StreamSupport.stream(allOptions.spliterator(), false) + .map(Entry::getKey) + .forEach(name -> options.delete(name)); // should be lowercase } - public static CaseInsensitiveMap<OptionValue> populateDefaultValues(Map<String, OptionDefinition> definitions, DrillConfig bootConfig) { + private CaseInsensitiveMap<OptionValue> populateDefaultValues(Map<String, OptionDefinition> definitions, DrillConfig bootConfig) { // populate the options from the config final Map<String, OptionValue> defaults = new HashMap<>(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java index de2b2c3..d37a0a2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java @@ -30,6 +30,7 @@ import org.apache.drill.exec.planner.PlannerPhase; import com.google.common.collect.ImmutableSet; import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.SessionOptionManager; /** Abstract class for StorePlugin implementations. * See StoragePlugin for description of the interface intent and its methods. @@ -102,12 +103,23 @@ public abstract class AbstractStoragePlugin implements StoragePlugin { } } + + @Override + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException { + return getPhysicalScan(userName, selection); + } + @Override public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException { return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS); } @Override + public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException { + return getPhysicalScan(userName, selection, columns); + } + + @Override public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException { throw new UnsupportedOperationException(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java index 50f2731..2617065 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java @@ -27,6 +27,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.server.options.SessionOptionManager; /** Interface for all implementations of the storage plugins. Different implementations of the storage * formats will implement methods that indicate if Drill can write or read its tables from that format, @@ -36,18 +37,18 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable { /** Indicates if Drill can read the table from this format. */ - public boolean supportsRead(); + boolean supportsRead(); /** Indicates if Drill can write a table to this format (e.g. as JSON, csv, etc.). */ - public boolean supportsWrite(); + boolean supportsWrite(); /** An implementation of this method will return one or more specialized rules that Drill query * optimizer can leverage in <i>physical</i> space. Otherwise, it should return an empty set. * @return an empty set or a set of plugin specific physical optimizer rules. */ @Deprecated - public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext); + Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext optimizerContext); /** * Get the physical scan operator for the particular GroupScan (read) node. @@ -55,9 +56,18 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable { * @param userName User whom to impersonate when when reading the contents as part of Scan. * @param selection The configured storage engine specific selection. * @return The physical scan operator for the particular GroupScan (read) node. - * @throws IOException */ - public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException; + AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) throws IOException; + + /** + * Get the physical scan operator for the particular GroupScan (read) node. + * + * @param userName User whom to impersonate when when reading the contents as part of Scan. + * @param selection The configured storage engine specific selection. + * @param options (optional) session options + * @return The physical scan operator for the particular GroupScan (read) node. + */ + AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, SessionOptionManager options) throws IOException; /** * Get the physical scan operator for the particular GroupScan (read) node. @@ -66,18 +76,29 @@ public interface StoragePlugin extends SchemaFactory, AutoCloseable { * @param selection The configured storage engine specific selection. * @param columns (optional) The list of column names to scan from the data source. * @return The physical scan operator for the particular GroupScan (read) node. - * @throws IOException */ - public AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) - throws IOException; + AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns) throws IOException; - /** Method returns a Jackson serializable object that extends a StoragePluginConfig - * @return an extension of StoragePluginConfig + /** + * Get the physical scan operator for the particular GroupScan (read) node. + * + * @param userName User whom to impersonate when when reading the contents as part of Scan. + * @param selection The configured storage engine specific selection. + * @param columns (optional) The list of column names to scan from the data source. + * @param options (optional) session options + * @return The physical scan operator for the particular GroupScan (read) node. + */ + AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, List<SchemaPath> columns, SessionOptionManager options) throws IOException; + + /** + * Method returns a Jackson serializable object that extends a StoragePluginConfig. + * + * @return an extension of StoragePluginConfig */ - public StoragePluginConfig getConfig(); + StoragePluginConfig getConfig(); /** * Initialize the storage plugin. The storage plugin will not be used until this method is called. */ - public void start() throws IOException; + void start() throws IOException; } diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 4b1e9dd..b0cc209 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -555,6 +555,10 @@ drill.exec.options: { store.hive.optimize_scan_with_native_readers: false, store.hive.parquet.optimize_scan_with_native_reader: false, store.hive.maprdb_json.optimize_scan_with_native_reader: false, + # Properties values should NOT be set in double-quotes or any other quotes. + # Property name and value should be separated by =. + # Properties should be separated by new line (\n). + store.hive.conf.properties: "", store.json.all_text_mode: false, store.json.writer.allow_nan_inf: true, store.json.reader.allow_nan_inf: true,
