[ 
https://issues.apache.org/jira/browse/DRILL-6575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16536846#comment-16536846
 ] 

ASF GitHub Bot commented on DRILL-6575:
---------------------------------------

asfgit closed pull request #1365: DRILL-6575: Add store.hive.conf.properties 
option to allow set Hive properties at session level
URL: https://github.com/apache/drill/pull/1365
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
index 50fee9c893b..3bc33b331d5 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan.java
@@ -88,7 +88,7 @@ public void onMatch(RelOptRuleCall call) {
       HiveScan hiveScan = (HiveScan) hiveScanRel.getGroupScan();
       HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
       HiveMetadataProvider hiveMetadataProvider = new 
HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry,
-          hiveScan.getStoragePlugin().getHiveConf());
+          hiveScan.getHiveConf());
       if (hiveMetadataProvider.getInputSplits(hiveReadEntry).isEmpty()) {
         // table is empty, use original scan
         return;
@@ -134,7 +134,7 @@ private DrillScanRel createNativeScanRel(final DrillScanRel 
hiveScanRel) throws
     List<SchemaPath> hiveScanCols = hiveScanRel.getColumns().stream()
         .map(colNameSchemaPath -> replaceOverriddenSchemaPath(parameters, 
colNameSchemaPath))
         .collect(Collectors.toList());
-    JsonTableGroupScan nariveMapRDBScan =
+    JsonTableGroupScan nativeMapRDBScan =
         new JsonTableGroupScan(
             hiveScan.getUserName(),
             hiveScan.getStoragePlugin(),
@@ -155,7 +155,7 @@ private DrillScanRel createNativeScanRel(final DrillScanRel 
hiveScanRel) throws
         hiveScanRel.getCluster(),
         hiveScanRel.getTraitSet(),
         hiveScanRel.getTable(),
-        nariveMapRDBScan,
+        nativeMapRDBScan,
         nativeScanRowType,
         hiveScanCols);
   }
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index 2a2f4fbce67..ea711572dc8 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.planner.sql.logical;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeFactory;
@@ -43,6 +41,8 @@
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -88,7 +88,7 @@ public void onMatch(RelOptRuleCall call) {
       final Table hiveTable = hiveScan.getHiveReadEntry().getTable();
       final HiveReadEntry hiveReadEntry = hiveScan.getHiveReadEntry();
 
-      final HiveMetadataProvider hiveMetadataProvider = new 
HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, 
hiveScan.getStoragePlugin().getHiveConf());
+      final HiveMetadataProvider hiveMetadataProvider = new 
HiveMetadataProvider(hiveScan.getUserName(), hiveReadEntry, 
hiveScan.getHiveConf());
       final List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits = 
hiveMetadataProvider.getInputSplits(hiveReadEntry);
 
       if (logicalInputSplits.isEmpty()) {
@@ -123,7 +123,7 @@ Hive does not always give correct costing (i.e. for 
external tables Hive does no
    * Create mapping of Hive partition column to directory column mapping.
    */
   private Map<String, String> getPartitionColMapping(final Table hiveTable, 
final String partitionColumnLabel) {
-    final Map<String, String> partitionColMapping = Maps.newHashMap();
+    final Map<String, String> partitionColMapping = new HashMap<>();
     int i = 0;
     for (FieldSchema col : hiveTable.getPartitionKeys()) {
       partitionColMapping.put(col.getName(), partitionColumnLabel+i);
@@ -143,8 +143,8 @@ private DrillScanRel createNativeScanRel(final Map<String, 
String> partitionColM
     final RelDataTypeFactory typeFactory = 
hiveScanRel.getCluster().getTypeFactory();
     final RelDataType varCharType = 
typeFactory.createSqlType(SqlTypeName.VARCHAR);
 
-    final List<String> nativeScanColNames = Lists.newArrayList();
-    final List<RelDataType> nativeScanColTypes = Lists.newArrayList();
+    final List<String> nativeScanColNames = new ArrayList<>();
+    final List<RelDataType> nativeScanColTypes = new ArrayList<>();
     for (RelDataTypeField field : hiveScanRel.getRowType().getFieldList()) {
       final String dirColName = partitionColMapping.get(field.getName());
       if (dirColName != null) { // partition column
@@ -161,8 +161,8 @@ private DrillScanRel createNativeScanRel(final Map<String, 
String> partitionColM
     // Create the list of projected columns set in HiveScan. The order of this 
list may not be same as the order of
     // columns in HiveScan row type. Note: If the HiveScan.getColumn() 
contains a '*', we just need to add it as it is,
     // unlike above where we expanded the '*'. HiveScan and related (subscan) 
can handle '*'.
-    final List<SchemaPath> nativeScanCols = Lists.newArrayList();
-    for(SchemaPath colName : hiveScanRel.getColumns()) {
+    final List<SchemaPath> nativeScanCols = new ArrayList<>();
+    for (SchemaPath colName : hiveScanRel.getColumns()) {
       final String partitionCol = 
partitionColMapping.get(colName.getRootSegmentPath());
       if (partitionCol != null) {
         nativeScanCols.add(SchemaPath.getSimplePath(partitionCol));
@@ -177,7 +177,8 @@ private DrillScanRel createNativeScanRel(final Map<String, 
String> partitionColM
             hiveScan.getUserName(),
             nativeScanCols,
             hiveScan.getStoragePlugin(),
-            logicalInputSplits);
+            logicalInputSplits,
+            hiveScan.getConfProperties());
 
     return new DrillScanRel(
         hiveScanRel.getCluster(),
@@ -194,7 +195,7 @@ private DrillScanRel createNativeScanRel(final Map<String, 
String> partitionColM
   private DrillProjectRel createProjectRel(final DrillScanRel hiveScanRel,
       final Map<String, String> partitionColMapping, final DrillScanRel 
nativeScanRel) {
 
-    final List<RexNode> rexNodes = Lists.newArrayList();
+    final List<RexNode> rexNodes = new ArrayList<>();
     final RexBuilder rb = hiveScanRel.getCluster().getRexBuilder();
     final RelDataType hiveScanRowType = hiveScanRel.getRowType();
 
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
index e22701511a1..d334ec8b4fe 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
@@ -38,6 +38,7 @@
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 @JsonTypeName("hive-drill-native-parquet-row-group-scan")
 public class HiveDrillNativeParquetRowGroupScan extends 
AbstractParquetRowGroupScan {
@@ -45,6 +46,7 @@
   private final HiveStoragePlugin hiveStoragePlugin;
   private final HiveStoragePluginConfig hiveStoragePluginConfig;
   private final HivePartitionHolder hivePartitionHolder;
+  private final Map<String, String> confProperties;
 
   @JsonCreator
   public HiveDrillNativeParquetRowGroupScan(@JacksonInject 
StoragePluginRegistry registry,
@@ -53,12 +55,14 @@ public HiveDrillNativeParquetRowGroupScan(@JacksonInject 
StoragePluginRegistry r
                                             
@JsonProperty("rowGroupReadEntries") List<RowGroupReadEntry> 
rowGroupReadEntries,
                                             @JsonProperty("columns") 
List<SchemaPath> columns,
                                             
@JsonProperty("hivePartitionHolder") HivePartitionHolder hivePartitionHolder,
+                                            @JsonProperty("confProperties") 
Map<String, String> confProperties,
                                             @JsonProperty("filter") 
LogicalExpression filter) throws ExecutionSetupException {
     this(userName,
         (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig),
         rowGroupReadEntries,
         columns,
         hivePartitionHolder,
+        confProperties,
         filter);
   }
 
@@ -67,11 +71,13 @@ public HiveDrillNativeParquetRowGroupScan(String userName,
                                             List<RowGroupReadEntry> 
rowGroupReadEntries,
                                             List<SchemaPath> columns,
                                             HivePartitionHolder 
hivePartitionHolder,
+                                            Map<String, String> confProperties,
                                             LogicalExpression filter) {
     super(userName, rowGroupReadEntries, columns, filter);
     this.hiveStoragePlugin = Preconditions.checkNotNull(hiveStoragePlugin, 
"Could not find format config for the given configuration");
     this.hiveStoragePluginConfig = hiveStoragePlugin.getConfig();
     this.hivePartitionHolder = hivePartitionHolder;
+    this.confProperties = confProperties;
   }
 
   @JsonProperty
@@ -84,6 +90,11 @@ public HivePartitionHolder getHivePartitionHolder() {
     return hivePartitionHolder;
   }
 
+  @JsonProperty
+  public Map<String, String> getConfProperties() {
+    return confProperties;
+  }
+
   @JsonIgnore
   public HiveStoragePlugin getHiveStoragePlugin() {
     return hiveStoragePlugin;
@@ -92,7 +103,7 @@ public HiveStoragePlugin getHiveStoragePlugin() {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
     Preconditions.checkArgument(children.isEmpty());
-    return new HiveDrillNativeParquetRowGroupScan(getUserName(), 
hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter);
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), 
hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, 
confProperties, filter);
   }
 
   @Override
@@ -102,7 +113,7 @@ public int getOperatorType() {
 
   @Override
   public AbstractParquetRowGroupScan copy(List<SchemaPath> columns) {
-    return new HiveDrillNativeParquetRowGroupScan(getUserName(), 
hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, filter);
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), 
hiveStoragePlugin, rowGroupReadEntries, columns, hivePartitionHolder, 
confProperties, filter);
   }
 
   @Override
@@ -114,7 +125,7 @@ public boolean areCorruptDatesAutoCorrected() {
   public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws 
IOException {
     Path path = new Path(rowGroupReadEntry.getPath()).getParent();
     return new ProjectionPusher().pushProjectionsAndFilters(
-        new JobConf(hiveStoragePlugin.getHiveConf()),
+        new 
JobConf(HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), 
confProperties)),
         path.getParent());
   }
 
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 03a80d33861..a973fa12b82 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -62,7 +62,8 @@
 public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
 
   private final HiveStoragePlugin hiveStoragePlugin;
-  private HivePartitionHolder hivePartitionHolder;
+  private final HivePartitionHolder hivePartitionHolder;
+  private final Map<String, String> confProperties;
 
   @JsonCreator
   public HiveDrillNativeParquetScan(@JacksonInject StoragePluginRegistry 
engineRegistry,
@@ -71,10 +72,12 @@ public HiveDrillNativeParquetScan(@JacksonInject 
StoragePluginRegistry engineReg
                                     @JsonProperty("columns") List<SchemaPath> 
columns,
                                     @JsonProperty("entries") 
List<ReadEntryWithPath> entries,
                                     @JsonProperty("hivePartitionHolder") 
HivePartitionHolder hivePartitionHolder,
+                                    @JsonProperty("confProperties") 
Map<String, String> confProperties,
                                     @JsonProperty("filter") LogicalExpression 
filter) throws IOException, ExecutionSetupException {
     super(ImpersonationUtil.resolveUserName(userName), columns, entries, 
filter);
     this.hiveStoragePlugin = (HiveStoragePlugin) 
engineRegistry.getPlugin(hiveStoragePluginConfig);
     this.hivePartitionHolder = hivePartitionHolder;
+    this.confProperties = confProperties;
 
     init();
   }
@@ -82,19 +85,22 @@ public HiveDrillNativeParquetScan(@JacksonInject 
StoragePluginRegistry engineReg
   public HiveDrillNativeParquetScan(String userName,
                                     List<SchemaPath> columns,
                                     HiveStoragePlugin hiveStoragePlugin,
-                                    List<LogicalInputSplit> 
logicalInputSplits) throws IOException {
-    this(userName, columns, hiveStoragePlugin, logicalInputSplits, 
ValueExpressions.BooleanExpression.TRUE);
+                                    List<LogicalInputSplit> logicalInputSplits,
+                                    Map<String, String> confProperties) throws 
IOException {
+    this(userName, columns, hiveStoragePlugin, logicalInputSplits, 
confProperties, ValueExpressions.BooleanExpression.TRUE);
   }
 
   public HiveDrillNativeParquetScan(String userName,
                                     List<SchemaPath> columns,
                                     HiveStoragePlugin hiveStoragePlugin,
                                     List<LogicalInputSplit> logicalInputSplits,
+                                    Map<String, String> confProperties,
                                     LogicalExpression filter) throws 
IOException {
     super(userName, columns, new ArrayList<>(), filter);
 
     this.hiveStoragePlugin = hiveStoragePlugin;
     this.hivePartitionHolder = new HivePartitionHolder();
+    this.confProperties = confProperties;
 
     for (LogicalInputSplit logicalInputSplit : logicalInputSplits) {
       Iterator<InputSplit> iterator = 
logicalInputSplit.getInputSplits().iterator();
@@ -122,6 +128,7 @@ private 
HiveDrillNativeParquetScan(HiveDrillNativeParquetScan that) {
     super(that);
     this.hiveStoragePlugin = that.hiveStoragePlugin;
     this.hivePartitionHolder = that.hivePartitionHolder;
+    this.confProperties = that.confProperties;
   }
 
   @JsonProperty
@@ -134,6 +141,11 @@ public HivePartitionHolder getHivePartitionHolder() {
     return hivePartitionHolder;
   }
 
+  @JsonProperty
+  public Map<String, String> getConfProperties() {
+    return confProperties;
+  }
+
   @Override
   public SubScan getSpecificScan(int minorFragmentId) {
     List<RowGroupReadEntry> readEntries = getReadEntries(minorFragmentId);
@@ -142,7 +154,7 @@ public SubScan getSpecificScan(int minorFragmentId) {
       List<String> values = hivePartitionHolder.get(readEntry.getPath());
       subPartitionHolder.add(readEntry.getPath(), values);
     }
-    return new HiveDrillNativeParquetRowGroupScan(getUserName(), 
hiveStoragePlugin, readEntries, columns, subPartitionHolder, filter);
+    return new HiveDrillNativeParquetRowGroupScan(getUserName(), 
hiveStoragePlugin, readEntries, columns, subPartitionHolder, confProperties, 
filter);
   }
 
   @Override
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 11d47f304cb..d631740155f 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -50,8 +50,6 @@
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 
 import static 
org.apache.drill.exec.store.hive.DrillHiveMetaStoreClient.createPartitionWithSpecColumns;
 
@@ -64,6 +62,7 @@
   private final HiveStoragePlugin hiveStoragePlugin;
   private final HiveReadEntry hiveReadEntry;
   private final HiveMetadataProvider metadataProvider;
+  private final Map<String, String> confProperties;
 
   private List<List<LogicalInputSplit>> mappings;
   private List<LogicalInputSplit> inputSplits;
@@ -75,22 +74,24 @@ public HiveScan(@JsonProperty("userName") final String 
userName,
                   @JsonProperty("hiveReadEntry") final HiveReadEntry 
hiveReadEntry,
                   @JsonProperty("hiveStoragePluginConfig") final 
HiveStoragePluginConfig hiveStoragePluginConfig,
                   @JsonProperty("columns") final List<SchemaPath> columns,
+                  @JsonProperty("confProperties") final Map<String, String> 
confProperties,
                   @JacksonInject final StoragePluginRegistry pluginRegistry) 
throws ExecutionSetupException {
     this(userName,
         hiveReadEntry,
         (HiveStoragePlugin) pluginRegistry.getPlugin(hiveStoragePluginConfig),
         columns,
-        null);
+        null, confProperties);
   }
 
   public HiveScan(final String userName, final HiveReadEntry hiveReadEntry, 
final HiveStoragePlugin hiveStoragePlugin,
-      final List<SchemaPath> columns, final HiveMetadataProvider 
metadataProvider) throws ExecutionSetupException {
+                  final List<SchemaPath> columns, final HiveMetadataProvider 
metadataProvider, final Map<String, String> confProperties) throws 
ExecutionSetupException {
     super(userName);
     this.hiveReadEntry = hiveReadEntry;
     this.columns = columns;
     this.hiveStoragePlugin = hiveStoragePlugin;
+    this.confProperties = confProperties;
     if (metadataProvider == null) {
-      this.metadataProvider = new HiveMetadataProvider(userName, 
hiveReadEntry, hiveStoragePlugin.getHiveConf());
+      this.metadataProvider = new HiveMetadataProvider(userName, 
hiveReadEntry, getHiveConf());
     } else {
       this.metadataProvider = metadataProvider;
     }
@@ -102,10 +103,11 @@ public HiveScan(final HiveScan that) {
     this.hiveReadEntry = that.hiveReadEntry;
     this.hiveStoragePlugin = that.hiveStoragePlugin;
     this.metadataProvider = that.metadataProvider;
+    this.confProperties = that.confProperties;
   }
 
   public HiveScan clone(final HiveReadEntry hiveReadEntry) throws 
ExecutionSetupException {
-    return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, 
columns, metadataProvider);
+    return new HiveScan(getUserName(), hiveReadEntry, hiveStoragePlugin, 
columns, metadataProvider, confProperties);
   }
 
   @JsonProperty
@@ -123,29 +125,37 @@ public HiveStoragePluginConfig 
getHiveStoragePluginConfig() {
     return columns;
   }
 
+  @JsonProperty
+  public Map<String, String> getConfProperties() {
+    return confProperties;
+  }
+
   @JsonIgnore
   public HiveStoragePlugin getStoragePlugin() {
     return hiveStoragePlugin;
   }
 
-  protected HiveMetadataProvider getMetadataProvider() {
-    return metadataProvider;
+  @JsonIgnore
+  public HiveConf getHiveConf() {
+    return HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), 
confProperties);
   }
 
-  private List<LogicalInputSplit> getInputSplits() {
-    if (inputSplits == null) {
-      inputSplits = metadataProvider.getInputSplits(hiveReadEntry);
-    }
-
-    return inputSplits;
+  @JsonIgnore
+  public boolean isNativeReader() {
+    return false;
   }
 
+  @Override
+  public boolean supportsPartitionFilterPushdown() {
+    List<FieldSchema> partitionKeys = 
hiveReadEntry.getTable().getPartitionKeys();
+    return !(partitionKeys == null || partitionKeys.size() == 0);
+  }
 
   @Override
   public void applyAssignments(final List<CoordinationProtos.DrillbitEndpoint> 
endpoints) {
     mappings = new ArrayList<>();
     for (int i = 0; i < endpoints.size(); i++) {
-      mappings.add(new ArrayList<LogicalInputSplit>());
+      mappings.add(new ArrayList<>());
     }
     final int count = endpoints.size();
     final List<LogicalInputSplit> inputSplits = getInputSplits();
@@ -158,9 +168,9 @@ public void applyAssignments(final 
List<CoordinationProtos.DrillbitEndpoint> end
   public SubScan getSpecificScan(final int minorFragmentId) throws 
ExecutionSetupException {
     try {
       final List<LogicalInputSplit> splits = mappings.get(minorFragmentId);
-      List<HivePartitionWrapper> parts = Lists.newArrayList();
-      final List<List<String>> encodedInputSplits = Lists.newArrayList();
-      final List<String> splitTypes = Lists.newArrayList();
+      List<HivePartitionWrapper> parts = new ArrayList<>();
+      final List<List<String>> encodedInputSplits = new ArrayList<>();
+      final List<String> splitTypes = new ArrayList<>();
       for (final LogicalInputSplit split : splits) {
         final Partition splitPartition = split.getPartition();
         if (splitPartition != null) {
@@ -176,7 +186,7 @@ public SubScan getSpecificScan(final int minorFragmentId) 
throws ExecutionSetupE
       }
 
       final HiveReadEntry subEntry = new 
HiveReadEntry(hiveReadEntry.getTableWrapper(), parts);
-      return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, 
splitTypes, columns, hiveStoragePlugin);
+      return new HiveSubScan(getUserName(), encodedInputSplits, subEntry, 
splitTypes, columns, hiveStoragePlugin, confProperties);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
     }
@@ -192,7 +202,7 @@ public int getMaxParallelizationWidth() {
     final Map<String, DrillbitEndpoint> endpointMap = new HashMap<>();
     for (final DrillbitEndpoint endpoint : 
hiveStoragePlugin.getContext().getBits()) {
       endpointMap.put(endpoint.getAddress(), endpoint);
-      logger.debug("endpoing address: {}", endpoint.getAddress());
+      logger.debug("Endpoint address: {}", endpoint.getAddress());
     }
     final Map<DrillbitEndpoint, EndpointAffinity> affinityMap = new 
HashMap<>();
     try {
@@ -204,7 +214,7 @@ public int getMaxParallelizationWidth() {
       for (final LogicalInputSplit split : inputSplits) {
         final float affinity = ((float) Math.max(1, split.getLength())) / 
totalSize;
         for (final String loc : split.getLocations()) {
-          logger.debug("split location: {}", loc);
+          logger.debug("Split location: {}", loc);
           final DrillbitEndpoint endpoint = endpointMap.get(loc);
           if (endpoint != null) {
             if (affinityMap.containsKey(endpoint)) {
@@ -218,13 +228,8 @@ public int getMaxParallelizationWidth() {
     } catch (final IOException e) {
       throw new DrillRuntimeException(e);
     }
-    for (final DrillbitEndpoint ep : affinityMap.keySet()) {
-      Preconditions.checkNotNull(ep);
-    }
-    for (final EndpointAffinity a : affinityMap.values()) {
-      Preconditions.checkNotNull(a.getEndpoint());
-    }
-    return Lists.newArrayList(affinityMap.values());
+
+    return new ArrayList<>(affinityMap.values());
   }
 
   @Override
@@ -243,21 +248,8 @@ public ScanStats getScanStats() {
     }
   }
 
-  protected int getSerDeOverheadFactor() {
-    final int projectedColumnCount;
-    if (Utilities.isStarQuery(columns)) {
-      Table hiveTable = hiveReadEntry.getTable();
-      projectedColumnCount = hiveTable.getSd().getColsSize() + 
hiveTable.getPartitionKeysSize();
-    } else {
-      // In cost estimation, # of project columns should be >= 1, even for 
skipAll query.
-      projectedColumnCount = Math.max(columns.size(), 1);
-    }
-
-    return projectedColumnCount * HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN;
-  }
-
   @Override
-  public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> 
children) throws ExecutionSetupException {
+  public PhysicalOperator getNewWithChildren(final List<PhysicalOperator> 
children) {
     return new HiveScan(this);
   }
 
@@ -275,6 +267,7 @@ public String toString() {
         + ", numPartitions=" + numPartitions
         + ", partitions= " + partitions
         + ", inputDirectories=" + 
metadataProvider.getInputDirectories(hiveReadEntry)
+        + ", confProperties=" + confProperties
         + "]";
   }
 
@@ -290,22 +283,24 @@ public boolean canPushdownProjects(final List<SchemaPath> 
columns) {
     return true;
   }
 
-  // Return true if the current table is partitioned false otherwise
-  public boolean supportsPartitionFilterPushdown() {
-    final List<FieldSchema> partitionKeys = 
hiveReadEntry.getTable().getPartitionKeys();
-    if (partitionKeys == null || partitionKeys.size() == 0) {
-      return false;
+  private List<LogicalInputSplit> getInputSplits() {
+    if (inputSplits == null) {
+      inputSplits = metadataProvider.getInputSplits(hiveReadEntry);
     }
-    return true;
-  }
 
-  @JsonIgnore
-  public HiveConf getHiveConf() {
-    return hiveStoragePlugin.getHiveConf();
+    return inputSplits;
   }
 
-  @JsonIgnore
-  public boolean isNativeReader() {
-    return false;
+  private int getSerDeOverheadFactor() {
+    final int projectedColumnCount;
+    if (Utilities.isStarQuery(columns)) {
+      Table hiveTable = hiveReadEntry.getTable();
+      projectedColumnCount = hiveTable.getSd().getColsSize() + 
hiveTable.getPartitionKeysSize();
+    } else {
+      // In cost estimation, # of project columns should be >= 1, even for 
skipAll query.
+      projectedColumnCount = Math.max(columns.size(), 1);
+    }
+
+    return projectedColumnCount * HIVE_SERDE_SCAN_OVERHEAD_FACTOR_PER_COLUMN;
   }
 }
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index ced8b01e218..adf134843a4 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -18,27 +18,34 @@
 package org.apache.drill.exec.store.hive;
 
 import java.io.IOException;
+import java.io.StringReader;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Properties;
 import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableSet;
 
 import org.apache.calcite.schema.Schema.TableType;
 import org.apache.calcite.schema.SchemaPlus;
 
+import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
 import 
org.apache.drill.exec.planner.sql.logical.ConvertHiveMapRDBJsonScanToDrillMapRDBJsonScan;
 import 
org.apache.drill.exec.planner.sql.logical.ConvertHiveParquetScanToDrillParquetScan;
 import 
org.apache.drill.exec.planner.sql.logical.HivePushPartitionFilterIntoScan;
 import org.apache.drill.exec.server.DrillbitContext;
 import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.AbstractStoragePlugin;
 import org.apache.drill.exec.store.SchemaConfig;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
@@ -53,17 +60,16 @@
 
 public class HiveStoragePlugin extends AbstractStoragePlugin {
 
-  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HiveStoragePlugin.class);
+  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HiveStoragePlugin.class);
 
   private final HiveStoragePluginConfig config;
   private HiveSchemaFactory schemaFactory;
   private final HiveConf hiveConf;
 
-  public HiveStoragePlugin(HiveStoragePluginConfig config, DrillbitContext 
context, String name)
-      throws ExecutionSetupException {
+  public HiveStoragePlugin(HiveStoragePluginConfig config, DrillbitContext 
context, String name) throws ExecutionSetupException {
     super(context, name);
     this.config = config;
-    this.hiveConf = createHiveConf(config.getHiveConfigOverride());
+    this.hiveConf = HiveUtilities.generateHiveConf(config.getConfigProps());
     this.schemaFactory = new HiveSchemaFactory(this, name, hiveConf);
   }
 
@@ -75,8 +81,18 @@ public HiveStoragePluginConfig getConfig() {
     return config;
   }
 
+  @Override
+  public HiveScan getPhysicalScan(String userName, JSONOptions selection, 
SessionOptionManager options) throws IOException {
+    return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS, 
options);
+  }
+
   @Override
   public HiveScan getPhysicalScan(String userName, JSONOptions selection, 
List<SchemaPath> columns) throws IOException {
+    return getPhysicalScan(userName, selection, columns, null);
+  }
+
+  @Override
+  public HiveScan getPhysicalScan(String userName, JSONOptions selection, 
List<SchemaPath> columns, SessionOptionManager options) throws IOException {
     HiveReadEntry hiveReadEntry = selection.getListWith(new ObjectMapper(), 
new TypeReference<HiveReadEntry>(){});
     try {
       if (hiveReadEntry.getJdbcTableType() == TableType.VIEW) {
@@ -84,7 +100,26 @@ public HiveScan getPhysicalScan(String userName, 
JSONOptions selection, List<Sch
             "Querying views created in Hive from Drill is not supported in 
current version.");
       }
 
-      return new HiveScan(userName, hiveReadEntry, this, columns, null);
+      Map<String, String> confProperties = new HashMap<>();
+      if (options != null) {
+        String value = 
StringEscapeUtils.unescapeJava(options.getString(ExecConstants.HIVE_CONF_PROPERTIES));
+        logger.trace("[{}] is set to {}.", ExecConstants.HIVE_CONF_PROPERTIES, 
value);
+        try {
+          Properties properties = new Properties();
+          properties.load(new StringReader(value));
+          confProperties =
+            properties.stringPropertyNames().stream()
+              .collect(
+                Collectors.toMap(
+                  Function.identity(),
+                  properties::getProperty,
+                  (o, n) -> n));
+          } catch (IOException e) {
+            logger.warn("Unable to parse Hive conf properties {}, ignoring 
them.", value);
+        }
+      }
+
+      return new HiveScan(userName, hiveReadEntry, this, columns, null, 
confProperties);
     } catch (ExecutionSetupException e) {
       throw new IOException(e);
     }
@@ -181,14 +216,4 @@ public synchronized void registerSchemas(SchemaConfig 
schemaConfig, SchemaPlus p
     return ruleBuilder.build();
   }
 
-  private static HiveConf createHiveConf(final Map<String, String> 
hiveConfigOverride) {
-    final HiveConf hiveConf = new HiveConf();
-    for(Entry<String, String> config : hiveConfigOverride.entrySet()) {
-      final String key = config.getKey();
-      final String value = config.getValue();
-      hiveConf.set(key, value);
-      logger.trace("HiveConfig Override {}={}", key, value);
-    }
-    return hiveConf;
-  }
 }
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
index b6f15c81e34..d8124688846 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
@@ -19,28 +19,31 @@
 
 import java.util.Map;
 
+import com.fasterxml.jackson.annotation.JsonAlias;
 import org.apache.drill.common.logical.StoragePluginConfigBase;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 
 @JsonTypeName(HiveStoragePluginConfig.NAME)
 public class HiveStoragePluginConfig extends StoragePluginConfigBase {
-  @JsonProperty
-  public Map<String, String> configProps;
 
   public static final String NAME = "hive";
 
-  @JsonIgnore
-  public Map<String, String> getHiveConfigOverride() {
-    return configProps;
-  }
+  private final Map<String, String> configProps;
 
   @JsonCreator
-  public HiveStoragePluginConfig(@JsonProperty("config") Map<String, String> 
props) {
-    this.configProps = props;
+  public HiveStoragePluginConfig(@JsonProperty("configProps")
+                                 // previously two names were allowed due to 
incorrectly written ser / der logic
+                                 // allowing to use both during 
deserialization for backward compatibility
+                                 @JsonAlias("config") Map<String, String> 
configProps) {
+    this.configProps = configProps;
+  }
+
+  @JsonProperty
+  public Map<String, String> getConfigProps() {
+    return configProps;
   }
 
   @Override
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index 8ca8647cb1d..0acec2db4ba 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.google.common.collect.ImmutableSet;
@@ -55,6 +56,7 @@
   private final HiveTableWithColumnCache table;
   private final List<HivePartition> partitions;
   private final List<SchemaPath> columns;
+  private final Map<String, String> confProperties;
 
   @JsonCreator
   public HiveSubScan(@JacksonInject StoragePluginRegistry registry,
@@ -63,22 +65,24 @@ public HiveSubScan(@JacksonInject StoragePluginRegistry 
registry,
                      @JsonProperty("hiveReadEntry") HiveReadEntry 
hiveReadEntry,
                      @JsonProperty("splitClasses") List<String> splitClasses,
                      @JsonProperty("columns") List<SchemaPath> columns,
-                     @JsonProperty("hiveStoragePluginConfig") 
HiveStoragePluginConfig hiveStoragePluginConfig)
+                     @JsonProperty("hiveStoragePluginConfig") 
HiveStoragePluginConfig hiveStoragePluginConfig,
+                     @JsonProperty("confProperties") Map<String, String> 
confProperties)
       throws IOException, ExecutionSetupException, 
ReflectiveOperationException {
     this(userName,
         splits,
         hiveReadEntry,
         splitClasses,
         columns,
-        (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig));
+        (HiveStoragePlugin) registry.getPlugin(hiveStoragePluginConfig), 
confProperties);
   }
 
   public HiveSubScan(final String userName,
                      final List<List<String>> splits,
                      final HiveReadEntry hiveReadEntry,
-                      final List<String> splitClasses,
+                     final List<String> splitClasses,
                      final List<SchemaPath> columns,
-                     final HiveStoragePlugin hiveStoragePlugin)
+                     final HiveStoragePlugin hiveStoragePlugin,
+                     final Map<String, String> confProperties)
     throws IOException, ReflectiveOperationException {
     super(userName);
     this.hiveReadEntry = hiveReadEntry;
@@ -88,6 +92,7 @@ public HiveSubScan(final String userName,
     this.splitClasses = splitClasses;
     this.columns = columns;
     this.hiveStoragePlugin = hiveStoragePlugin;
+    this.confProperties = confProperties;
 
     for (int i = 0; i < splits.size(); i++) {
       inputSplits.add(deserializeInputSplit(splits.get(i), 
splitClasses.get(i)));
@@ -119,6 +124,11 @@ public HiveStoragePluginConfig 
getHiveStoragePluginConfig() {
     return hiveStoragePlugin.getConfig();
   }
 
+  @JsonProperty
+  public Map<String, String> getConfProperties() {
+    return confProperties;
+  }
+
   @JsonIgnore
   public HiveTableWithColumnCache getTable() {
     return table;
@@ -141,7 +151,7 @@ public HiveStoragePlugin getStoragePlugin() {
 
   @JsonIgnore
   public HiveConf getHiveConf() {
-    return hiveStoragePlugin.getHiveConf();
+    return HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), 
confProperties);
   }
 
   @Override
@@ -152,7 +162,7 @@ public HiveConf getHiveConf() {
   @Override
   public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) 
throws ExecutionSetupException {
     try {
-      return new HiveSubScan(getUserName(), splits, hiveReadEntry, 
splitClasses, columns, hiveStoragePlugin);
+      return new HiveSubScan(getUserName(), splits, hiveReadEntry, 
splitClasses, columns, hiveStoragePlugin, confProperties);
     } catch (IOException | ReflectiveOperationException e) {
       throw new ExecutionSetupException(e);
     }
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index c8efb650a99..6fc567ee196 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -93,7 +93,14 @@
 public class HiveUtilities {
   private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(HiveUtilities.class);
 
-  /** Partition value is received in string format. Convert it into 
appropriate object based on the type. */
+  /**
+   * Partition value is received in string format. Convert it into appropriate 
object based on the type.
+   *
+   * @param typeInfo type info
+   * @param value partition values
+   * @param defaultPartitionValue default partition value
+   * @return converted object
+   */
   public static Object convertPartitionType(TypeInfo typeInfo, String value, 
final String defaultPartitionValue) {
     if (typeInfo.getCategory() != Category.PRIMITIVE) {
       // In Hive only primitive types are allowed as partition column types.
@@ -147,6 +154,15 @@ public static Object convertPartitionType(TypeInfo 
typeInfo, String value, final
     return null;
   }
 
+  /**
+   * Populates vector with given value based on its type.
+   *
+   * @param vector vector instance
+   * @param managedBuffer Drill duffer
+   * @param val value
+   * @param start start position
+   * @param end end position
+   */
   public static void populateVector(final ValueVector vector, final DrillBuf 
managedBuffer, final Object val,
       final int start, final int end) {
     TypeProtos.MinorType type = vector.getField().getType().getMinorType();
@@ -307,6 +323,13 @@ public static void populateVector(final ValueVector 
vector, final DrillBuf manag
     }
   }
 
+  /**
+   * Obtains major type from given type info holder.
+   *
+   * @param typeInfo type info holder
+   * @param options session options
+   * @return appropriate major type, null otherwise. For some types may throw 
unsupported exception.
+   */
   public static MajorType getMajorTypeFromHiveTypeInfo(final TypeInfo 
typeInfo, final OptionSet options) {
     switch (typeInfo.getCategory()) {
       case PRIMITIVE: {
@@ -343,8 +366,14 @@ public static MajorType getMajorTypeFromHiveTypeInfo(final 
TypeInfo typeInfo, fi
     return null;
   }
 
-  public static TypeProtos.MinorType 
getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo,
-                                                                           
OptionSet options) {
+  /**
+   * Obtains minor type from given primitive type info holder.
+   *
+   * @param primitiveTypeInfo primitive type info holder
+   * @param options session options
+   * @return appropriate minor type, otherwise throws unsupported type 
exception
+   */
+  public static TypeProtos.MinorType 
getMinorTypeFromHivePrimitiveTypeInfo(PrimitiveTypeInfo primitiveTypeInfo, 
OptionSet options) {
     switch(primitiveTypeInfo.getPrimitiveCategory()) {
       case BINARY:
         return TypeProtos.MinorType.VARBINARY;
@@ -392,10 +421,8 @@ public static MajorType getMajorTypeFromHiveTypeInfo(final 
TypeInfo typeInfo, fi
    * @param job {@link JobConf} instance needed incase the table is 
StorageHandler based table.
    * @param sd {@link StorageDescriptor} instance of currently reading 
partition or table (for non-partitioned tables).
    * @param table Table object
-   * @throws Exception
    */
-  public static Class<? extends InputFormat<?, ?>> getInputFormatClass(final 
JobConf job, final StorageDescriptor sd,
-      final Table table) throws Exception {
+  public static Class<? extends InputFormat<?, ?>> getInputFormatClass(final 
JobConf job, final StorageDescriptor sd, final Table table) throws Exception {
     final String inputFormatName = sd.getInputFormat();
     if (Strings.isNullOrEmpty(inputFormatName)) {
       final String storageHandlerClass = 
table.getParameters().get(META_TABLE_STORAGE);
@@ -426,26 +453,23 @@ public static void addConfToJob(final JobConf job, final 
Properties properties)
   }
 
   /**
-   * Wrapper around {@link MetaStoreUtils#getPartitionMetadata(Partition, 
Table)} which also adds parameters from table
-   * to properties returned by {@link 
MetaStoreUtils#getPartitionMetadata(Partition, Table)}.
+   * Wrapper around {@link 
MetaStoreUtils#getPartitionMetadata(org.apache.hadoop.hive.metastore.api.Partition,
 Table)}
+   * which also adds parameters from table to properties returned by that 
method.
    *
    * @param partition the source of partition level parameters
    * @param table     the source of table level parameters
    * @return properties
    */
   public static Properties getPartitionMetadata(final HivePartition partition, 
final HiveTableWithColumnCache table) {
-    final Properties properties;
     restoreColumns(table, partition);
-    properties = MetaStoreUtils.getPartitionMetadata(partition, table);
+    Properties properties = MetaStoreUtils.getPartitionMetadata(partition, 
table);
 
     // SerDe expects properties from Table, but above call doesn't add Table 
properties.
     // Include Table properties in final list in order to not to break SerDes 
that depend on
     // Table properties. For example AvroSerDe gets the schema from properties 
(passed as second argument)
-    for (Map.Entry<String, String> entry : table.getParameters().entrySet()) {
-      if (entry.getKey() != null && entry.getKey() != null) {
-        properties.put(entry.getKey(), entry.getValue());
-      }
-    }
+    table.getParameters().entrySet().stream()
+        .filter(e -> e.getKey() != null && e.getValue() != null)
+        .forEach(e -> properties.put(e.getKey(), e.getValue()));
 
     return properties;
   }
@@ -453,8 +477,8 @@ public static Properties getPartitionMetadata(final 
HivePartition partition, fin
   /**
    * Sets columns from table cache to table and partition.
    *
+   * @param table the source of column lists cache
    * @param partition partition which will set column list
-   * @param table     the source of column lists cache
    */
   public static void restoreColumns(HiveTableWithColumnCache table, 
HivePartition partition) {
     // exactly the same column lists for partitions or table
@@ -471,6 +495,9 @@ public static void restoreColumns(HiveTableWithColumnCache 
table, HivePartition
    * Wrapper around {@link MetaStoreUtils#getSchema(StorageDescriptor, 
StorageDescriptor, Map, String, String, List)}
    * which also sets columns from table cache to table and returns properties 
returned by
    * {@link MetaStoreUtils#getSchema(StorageDescriptor, StorageDescriptor, 
Map, String, String, List)}.
+   *
+   * @param table Hive table with cached columns
+   * @return Hive table metadata
    */
   public static Properties getTableMetadata(HiveTableWithColumnCache table) {
     restoreColumns(table, null);
@@ -478,13 +505,18 @@ public static Properties 
getTableMetadata(HiveTableWithColumnCache table) {
       table.getDbName(), table.getTableName(), table.getPartitionKeys());
   }
 
+  /**
+   * Generates unsupported types exception message with list of supported types
+   * and throws user exception.
+   *
+   * @param unsupportedType unsupported type
+   */
   public static void throwUnsupportedHiveDataTypeError(String unsupportedType) 
{
-    StringBuilder errMsg = new StringBuilder();
-    errMsg.append(String.format("Unsupported Hive data type %s. ", 
unsupportedType));
-    errMsg.append(System.getProperty("line.separator"));
-    errMsg.append("Following Hive data types are supported in Drill for 
querying: ");
-    errMsg.append(
-        "BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DATE, 
TIMESTAMP, BINARY, DECIMAL, STRING, VARCHAR and CHAR");
+    StringBuilder errMsg = new StringBuilder()
+        .append("Unsupported Hive data type 
").append(unsupportedType).append(". ")
+        .append(System.lineSeparator())
+        .append("Following Hive data types are supported in Drill for 
querying: ")
+        .append("BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, FLOAT, DOUBLE, DATE, 
TIMESTAMP, BINARY, DECIMAL, STRING, VARCHAR and CHAR");
 
     throw UserException.unsupportedError()
         .message(errMsg.toString())
@@ -633,7 +665,7 @@ public static boolean 
nativeReadersRuleMatches(RelOptRuleCall call, Class tableI
   }
 
   /**
-   * Get the input format from given {@link StorageDescriptor}
+   * Get the input format from given {@link StorageDescriptor}.
    *
    * @param properties table properties
    * @param hiveReadEntry hive read entry
@@ -681,5 +713,35 @@ public static boolean containsUnsupportedDataTypes(final 
Table hiveTable) {
     }
     return false;
   }
+
+  /**
+   * Creates HiveConf based on given list of configuration properties.
+   *
+   * @param properties config properties
+   * @return instance of HiveConf
+   */
+  public static HiveConf generateHiveConf(Map<String, String> properties) {
+    logger.trace("Override HiveConf with the following properties {}", 
properties);
+    HiveConf hiveConf = new HiveConf();
+    properties.forEach(hiveConf::set);
+    return hiveConf;
+  }
+
+  /**
+   * Creates HiveConf based on properties in given HiveConf and configuration 
properties.
+   *
+   * @param hiveConf hive conf
+   * @param properties config properties
+   * @return instance of HiveConf
+   */
+  public static HiveConf generateHiveConf(HiveConf hiveConf, Map<String, 
String> properties) {
+    Properties changedProperties = hiveConf.getChangedProperties();
+    changedProperties.putAll(properties);
+    HiveConf newHiveConf = new HiveConf();
+    changedProperties.stringPropertyNames()
+        .forEach(name -> newHiveConf.set(name, 
changedProperties.getProperty(name)));
+    return newHiveConf;
+  }
+
 }
 
diff --git 
a/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json 
b/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json
index d06220fe7ca..018189c382d 100644
--- 
a/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json
+++ 
b/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json
@@ -2,7 +2,7 @@
   "storage":{
     hive : {
       type:"hive",
-      config : {
+      configProps : {
         "hive.metastore.uris" : "",
         "javax.jdo.option.ConnectionURL" : 
"jdbc:derby:;databaseName=../sample-data/drill_hive_db;create=true",
         "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh",
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
index 556deb2a2f5..ea8d5df8412 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/TestHiveDrillNativeParquetReader.java
@@ -152,6 +152,12 @@ public void testPhysicalPlanSubmission() throws Exception {
     // checks only group scan
     PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from 
hive.kv_native");
     PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from 
hive.kv_native_ext");
+    try {
+      alterSession(ExecConstants.HIVE_CONF_PROPERTIES, 
"hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+      PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from 
hive.sub_dir_table");
+    } finally {
+      resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+    }
   }
 
   @Test
@@ -243,4 +249,15 @@ public void 
testNativeReaderIsDisabledForAlteredPartitionedTable() throws Except
     testPlanMatchingPatterns(query, new String[] {"HiveScan"}, new 
String[]{"HiveDrillNativeParquetScan"});
   }
 
+  @Test
+  public void testHiveConfPropertiesAtSessionLevel() throws Exception {
+    String query = "select * from hive.sub_dir_table";
+    try {
+      alterSession(ExecConstants.HIVE_CONF_PROPERTIES, 
"hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+      test(query);
+    } finally {
+      resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+    }
+  }
+
 }
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index 25393e7363a..94f39b806d8 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -412,6 +412,27 @@ public void testNonAsciiStringLiterals() throws Exception {
   public void testPhysicalPlanSubmission() throws Exception {
     PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from 
hive.kv");
     PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from 
hive.readtest");
+    try {
+      alterSession(ExecConstants.HIVE_CONF_PROPERTIES, 
"hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+      PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from 
hive.sub_dir_table");
+    } finally {
+      resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+    }
+  }
+
+  @Test
+  public void testHiveConfPropertiesAtSessionLevel() throws Exception {
+    String query = "select * from hive.sub_dir_table";
+    try {
+      alterSession(ExecConstants.HIVE_CONF_PROPERTIES, 
"hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
+      test(query);
+    } finally {
+      resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
+    }
+
+    thrown.expect(UserRemoteException.class);
+    thrown.expectMessage(containsString("IOException: Not a file"));
+    test(query);
   }
 
   private void verifyColumnsMetadata(List<UserProtos.ResultColumnMetadata> 
columnsList, Map<String, Integer> expectedResult) {
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
index 80da976d443..c5c0d488c76 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java
@@ -50,6 +50,7 @@ public void showTablesFromDb() throws Exception{
         .baselineValues("hive.default", "partition_with_few_schemas")
         .baselineValues("hive.default", "kv_native")
         .baselineValues("hive.default", "kv_native_ext")
+        .baselineValues("hive.default", "sub_dir_table")
         .go();
 
     testBuilder()
@@ -254,6 +255,7 @@ public void showInfoSchema() throws Exception {
         .baselineValues("DRILL", "hive.default", "simple_json", "TABLE")
         .baselineValues("DRILL", "hive.default", "kv_native", "TABLE")
         .baselineValues("DRILL", "hive.default", "kv_native_ext", "TABLE")
+        .baselineValues("DRILL", "hive.default", "sub_dir_table", "TABLE")
         .baselineValues("DRILL", "hive.skipper", "kv_text_small", "TABLE")
         .baselineValues("DRILL", "hive.skipper", "kv_text_large", "TABLE")
         .baselineValues("DRILL", "hive.skipper", "kv_incorrect_skip_header", 
"TABLE")
diff --git 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
index f2069999849..074cb3b83a0 100644
--- 
a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
+++ 
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java
@@ -25,6 +25,7 @@
 import java.nio.file.attribute.PosixFilePermission;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 
@@ -42,7 +43,6 @@
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.session.SessionState;
 
-import com.google.common.collect.Maps;
 import org.apache.hadoop.hive.serde.serdeConstants;
 
 import static org.apache.drill.exec.hive.HiveTestUtilities.executeQuery;
@@ -80,7 +80,7 @@ private HiveTestDataGenerator(final String dbDir, final 
String whDir, final Base
     this.whDir = whDir;
     this.dirTestWatcher = dirTestWatcher;
 
-    config = Maps.newHashMap();
+    config = new HashMap<>();
     config.put(ConfVars.METASTOREURIS.toString(), "");
     config.put("javax.jdo.option.ConnectionURL", 
String.format("jdbc:derby:;databaseName=%s;create=true", dbDir));
     config.put("hive.metastore.warehouse.dir", whDir);
@@ -89,7 +89,9 @@ private HiveTestDataGenerator(final String dbDir, final 
String whDir, final Base
 
   /**
    * Add Hive test storage plugin to the given plugin registry.
-   * @throws Exception
+   *
+   * @param pluginRegistry storage plugin registry
+   * @throws Exception in case if unable to update Hive storage plugin
    */
   public void addHiveTestPlugin(final StoragePluginRegistry pluginRegistry) 
throws Exception {
     HiveStoragePluginConfig pluginConfig = new HiveStoragePluginConfig(config);
@@ -101,7 +103,8 @@ public void addHiveTestPlugin(final StoragePluginRegistry 
pluginRegistry) throws
   /**
    * Update the current HiveStoragePlugin in given plugin registry with given 
<i>configOverride</i>.
    *
-   * @param configOverride
+   * @param pluginRegistry storage plugin registry
+   * @param configOverride config properties to be overridden
    * @throws DrillException if fails to update or no Hive plugin currently 
exists in given plugin registry.
    */
   public void updatePluginConfig(final StoragePluginRegistry pluginRegistry, 
Map<String, String> configOverride)
@@ -113,7 +116,7 @@ public void updatePluginConfig(final StoragePluginRegistry 
pluginRegistry, Map<S
     }
 
     HiveStoragePluginConfig newPluginConfig = storagePlugin.getConfig();
-    newPluginConfig.getHiveConfigOverride().putAll(configOverride);
+    newPluginConfig.getConfigProps().putAll(configOverride);
 
     pluginRegistry.createOrUpdate(HIVE_TEST_PLUGIN_NAME, newPluginConfig, 
true);
   }
@@ -344,7 +347,7 @@ private void generateTestData() throws Exception {
         "charType CHAR(10))"
     );
 
-    /**
+    /*
      * Create a PARQUET table with all supported types.
      */
     executeQuery(hiveDriver,
@@ -542,6 +545,8 @@ private void generateTestData() throws Exception {
 
     createTestDataForDrillNativeParquetReaderTests(hiveDriver);
 
+    createSubDirTable(hiveDriver, testDataFile);
+
     ss.close();
   }
 
@@ -594,56 +599,61 @@ private void 
createTestDataForDrillNativeParquetReaderTests(Driver hiveDriver) {
       "location '%s'", thirdPartition));
   }
 
+  private void createSubDirTable(Driver hiveDriver, String testDataFile) {
+    String tableName = "sub_dir_table";
+    dirTestWatcher.copyResourceToRoot(Paths.get(testDataFile), 
Paths.get(tableName, "sub_dir", "data.txt"));
+
+    String tableLocation = 
Paths.get(dirTestWatcher.getRootDir().toURI().getPath(), 
tableName).toUri().getPath();
+
+    String tableDDL = String.format("create external table sub_dir_table (key 
int, value string) " +
+        "row format delimited fields terminated by ',' stored as textfile 
location '%s'", tableLocation);
+    executeQuery(hiveDriver, tableDDL);
+  }
+
   private File getTempFile() throws Exception {
     return java.nio.file.Files.createTempFile("drill-hive-test", 
".txt").toFile();
   }
 
   private String generateTestDataFile() throws Exception {
-    final File file = getTempFile();
-    PrintWriter printWriter = new PrintWriter(file);
-    for (int i=1; i<=5; i++) {
-      printWriter.println (String.format("%d, key_%d", i, i));
+    File file = getTempFile();
+    try (PrintWriter printWriter = new PrintWriter(file)) {
+      for (int i = 1; i <= 5; i++) {
+        printWriter.println(String.format("%d, key_%d", i, i));
+      }
     }
-    printWriter.close();
-
     return file.getPath();
   }
 
   private String generateTestDataFileForPartitionInput() throws Exception {
-    final File file = getTempFile();
-
-    PrintWriter printWriter = new PrintWriter(file);
-
-    String partValues[] = {"1", "2", "null"};
-
-    for(int c = 0; c < partValues.length; c++) {
-      for(int d = 0; d < partValues.length; d++) {
-        for(int e = 0; e < partValues.length; e++) {
-          for (int i = 1; i <= 5; i++) {
-            Date date = new Date(System.currentTimeMillis());
-            Timestamp ts = new Timestamp(System.currentTimeMillis());
-            printWriter.printf("%s,%s,%s,%s,%s",
-                date.toString(), ts.toString(), partValues[c], partValues[d], 
partValues[e]);
-            printWriter.println();
+    File file = getTempFile();
+    try (PrintWriter printWriter = new PrintWriter(file)) {
+      String partValues[] = {"1", "2", "null"};
+      for (String partValue : partValues) {
+        for (String partValue1 : partValues) {
+          for (String partValue2 : partValues) {
+            for (int i = 1; i <= 5; i++) {
+              Date date = new Date(System.currentTimeMillis());
+              Timestamp ts = new Timestamp(System.currentTimeMillis());
+              printWriter.printf("%s,%s,%s,%s,%s", date.toString(), 
ts.toString(), partValue, partValue1, partValue2);
+              printWriter.println();
+            }
           }
         }
       }
     }
 
-    printWriter.close();
-
     return file.getPath();
   }
 
   private String generateAllTypesDataFile() throws Exception {
     File file = getTempFile();
 
-    PrintWriter printWriter = new PrintWriter(file);
-    
printWriter.println("YmluYXJ5ZmllbGQ=,false,34,65.99,2347.923,2758725827.9999,29375892739852.7689,"
 +
-        
"89853749534593985.7834783,8.345,4.67,123456,234235,3455,stringfield,varcharfield,"
 +
-        "2013-07-05 17:01:00,2013-07-05,charfield");
-    printWriter.println(",,,,,,,,,,,,,,,,");
-    printWriter.close();
+    try (PrintWriter printWriter = new PrintWriter(file)) {
+      
printWriter.println("YmluYXJ5ZmllbGQ=,false,34,65.99,2347.923,2758725827.9999,29375892739852.7689,"+
+          
"89853749534593985.7834783,8.345,4.67,123456,234235,3455,stringfield,varcharfield,"+
+          "2013-07-05 17:01:00,2013-07-05,charfield");
+      printWriter.println(",,,,,,,,,,,,,,,,");
+    }
 
     return file.getPath();
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 49f149b37ae..4c840a41398 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -410,6 +410,9 @@ private ExecConstants() {
   public static final OptionValidator 
HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR =
       new BooleanValidator(HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER);
 
+  public static final String HIVE_CONF_PROPERTIES = 
"store.hive.conf.properties";
+  public static final OptionValidator HIVE_CONF_PROPERTIES_VALIDATOR = new 
StringValidator(HIVE_CONF_PROPERTIES);
+
   public static final String SLICE_TARGET = "planner.slice_target";
   public static final long SLICE_TARGET_DEFAULT = 100000l;
   public static final PositiveLongValidator SLICE_TARGET_OPTION = new 
PositiveLongValidator(SLICE_TARGET, Long.MAX_VALUE);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 36e74a09424..e7518b8ba02 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -17,8 +17,6 @@
  */
 package org.apache.drill.exec.opt;
 
-import com.google.common.collect.Lists;
-
 import org.apache.drill.common.JSONOptions;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -28,9 +26,7 @@
 import org.apache.drill.common.logical.data.Filter;
 import org.apache.drill.common.logical.data.GroupingAggregate;
 import org.apache.drill.common.logical.data.Join;
-import org.apache.drill.common.logical.data.JoinCondition;
 import org.apache.drill.common.logical.data.LogicalOperator;
-import org.apache.drill.common.logical.data.NamedExpression;
 import org.apache.drill.common.logical.data.Order;
 import org.apache.drill.common.logical.data.Order.Ordering;
 import org.apache.drill.common.logical.data.Project;
@@ -53,6 +49,7 @@
 import org.apache.drill.exec.physical.config.SelectionVectorRemover;
 import org.apache.drill.exec.physical.config.Sort;
 import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.UnnestPOP;
 import org.apache.drill.exec.physical.config.WindowPOP;
 import org.apache.drill.exec.rpc.UserClientConnection;
 import org.apache.drill.exec.server.options.OptionManager;
@@ -64,6 +61,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class BasicOptimizer extends Optimizer {
 //  private static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class);
@@ -99,8 +97,7 @@ public PhysicalPlan optimize(final OptimizationContext 
context, final LogicalPla
         .version(logicalProperties.version)
         .generator(logicalProperties.generator)
         .options(new 
JSONOptions(context.getOptions().getOptionList())).build();
-    final PhysicalPlan p = new PhysicalPlan(props, physOps);
-    return p;
+    return new PhysicalPlan(props, physOps);
   }
 
   public static class BasicOptimizationContext implements OptimizationContext {
@@ -128,30 +125,28 @@ public OptionManager getOptions() {
      */
     private final LogicalPlan logicalPlan;
 
-    public LogicalConverter(final LogicalPlan logicalPlan) {
+    LogicalConverter(final LogicalPlan logicalPlan) {
       this.logicalPlan = logicalPlan;
     }
 
     @Override
     public PhysicalOperator visitGroupingAggregate(GroupingAggregate groupBy, 
Object value) throws OptimizerException {
-      final List<Ordering> orderDefs = Lists.newArrayList();
       PhysicalOperator input = groupBy.getInput().accept(this, value);
 
       if (groupBy.getKeys().size() > 0) {
-        for(NamedExpression e : groupBy.getKeys()) {
-          orderDefs.add(new Ordering(Direction.ASCENDING, e.getExpr(), 
NullDirection.FIRST));
-        }
+        List<Ordering> orderDefs = groupBy.getKeys().stream()
+            .map(e -> new Ordering(Direction.ASCENDING, e.getExpr(), 
NullDirection.FIRST))
+            .collect(Collectors.toList());
         input = new Sort(input, orderDefs, false);
       }
 
-      final StreamingAggregate sa = new StreamingAggregate(input, 
groupBy.getKeys(), groupBy.getExprs(), 1.0f);
-      return sa;
+      return new StreamingAggregate(input, groupBy.getKeys(), 
groupBy.getExprs(), 1.0f);
     }
 
     @Override
     public PhysicalOperator visitWindow(final Window window, final Object 
value) throws OptimizerException {
       PhysicalOperator input = window.getInput().accept(this, value);
-      final List<Ordering> ods = Lists.newArrayList();
+      final List<Ordering> ods = new ArrayList<>();
 
       input = new Sort(input, ods, false);
 
@@ -162,11 +157,7 @@ public PhysicalOperator visitWindow(final Window window, 
final Object value) thr
     @Override
     public PhysicalOperator visitOrder(final Order order, final Object value) 
throws OptimizerException {
       final PhysicalOperator input = order.getInput().accept(this, value);
-      final List<Ordering> ods = Lists.newArrayList();
-      for (Ordering o : order.getOrderings()){
-        ods.add(o);
-      }
-
+      final List<Ordering> ods = new ArrayList<>(order.getOrderings());
       return new SelectionVectorRemover(new Sort(input, ods, false));
     }
 
@@ -180,18 +171,20 @@ public PhysicalOperator visitLimit(final 
org.apache.drill.common.logical.data.Li
     @Override
     public PhysicalOperator visitJoin(final Join join, final Object value) 
throws OptimizerException {
       PhysicalOperator leftOp = join.getLeft().accept(this, value);
-      final List<Ordering> leftOrderDefs = Lists.newArrayList();
-      for(JoinCondition jc : join.getConditions()){
-        leftOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getLeft()));
-      }
+
+      List<Ordering> leftOrderDefs = join.getConditions().stream()
+          .map(jc -> new Ordering(Direction.ASCENDING, jc.getLeft()))
+          .collect(Collectors.toList());
+
       leftOp = new Sort(leftOp, leftOrderDefs, false);
       leftOp = new SelectionVectorRemover(leftOp);
 
       PhysicalOperator rightOp = join.getRight().accept(this, value);
-      final List<Ordering> rightOrderDefs = Lists.newArrayList();
-      for(JoinCondition jc : join.getConditions()){
-        rightOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getRight()));
-      }
+
+      List<Ordering> rightOrderDefs = join.getConditions().stream()
+          .map(jc -> new Ordering(Direction.ASCENDING, jc.getRight()))
+          .collect(Collectors.toList());
+
       rightOp = new Sort(rightOp, rightOrderDefs, false);
       rightOp = new SelectionVectorRemover(rightOp);
 
@@ -210,7 +203,7 @@ public PhysicalOperator visitScan(final Scan scan, final 
Object obj) throws Opti
       try {
         final StoragePlugin storagePlugin = 
queryContext.getStorage().getPlugin(config);
         final String user = 
userSession.getSession().getCredentials().getUserName();
-        return storagePlugin.getPhysicalScan(user, scan.getSelection());
+        return storagePlugin.getPhysicalScan(user, scan.getSelection(), 
userSession.getSession().getOptions());
       } catch (IOException | ExecutionSetupException e) {
         throw new OptimizerException("Failure while attempting to retrieve 
storage engine.", e);
       }
@@ -241,8 +234,8 @@ public PhysicalOperator visitFilter(final Filter filter, 
final Object obj) throw
     }
 
     @Override
-    public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj) 
throws OptimizerException {
-      return new org.apache.drill.exec.physical.config.UnnestPOP(null, 
unnest.getColumn());
+    public PhysicalOperator visitUnnest(final Unnest unnest, final Object obj) 
{
+      return new UnnestPOP(null, unnest.getColumn());
     }
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
index 4ee767146c1..53036f1f4f7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java
@@ -32,6 +32,7 @@
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.physical.base.SchemalessScan;
 import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 import org.apache.drill.exec.store.StoragePlugin;
 import org.apache.drill.exec.store.dfs.FileSelection;
 import org.apache.drill.exec.util.ImpersonationUtil;
@@ -45,6 +46,7 @@
   private final StoragePlugin plugin;
   private final String userName;
   private GroupScan scan;
+  private SessionOptionManager options;
 
   /**
    * Creates a DrillTable instance for a @{code TableType#Table} table.
@@ -85,12 +87,16 @@ public DrillTable(String storageEngineName, StoragePlugin 
plugin, Object selecti
     this(storageEngineName, plugin, ImpersonationUtil.getProcessUserName(), 
selection);
   }
 
+  public void setOptions(SessionOptionManager options) {
+    this.options = options;
+  }
+
   public GroupScan getGroupScan() throws IOException{
     if (scan == null) {
       if (selection instanceof FileSelection && ((FileSelection) 
selection).isEmptyDirectory()) {
         this.scan = new SchemalessScan(userName, ((FileSelection) 
selection).getSelectionRoot());
       } else {
-        this.scan = plugin.getPhysicalScan(userName, new 
JSONOptions(selection));
+        this.scan = plugin.getPhysicalScan(userName, new 
JSONOptions(selection), options);
       }
     }
     return scan;
@@ -138,7 +144,8 @@ public boolean rolledUpColumnValidInsideAgg(String column,
     return true;
   }
 
-  @Override public boolean isRolledUp(String column) {
+  @Override
+  public boolean isRolledUp(String column) {
     return false;
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index 3f65ad2c20c..d6b0951403c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -78,6 +78,7 @@
 import org.apache.drill.exec.planner.cost.DrillCostBase;
 import org.apache.drill.exec.planner.logical.DrillConstExecutor;
 import org.apache.drill.exec.planner.logical.DrillRelFactories;
+import org.apache.drill.exec.planner.logical.DrillTable;
 import org.apache.drill.exec.planner.physical.DrillDistributionTraitDef;
 import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.rpc.user.UserSession;
@@ -113,7 +114,6 @@
   private final DrillConfig drillConfig;
   private RelOptCluster cluster;
 
-  private String sql;
   private VolcanoPlanner planner;
   private boolean useRootSchema = false;
 
@@ -187,12 +187,10 @@ public SqlNode parse(String sql) {
 
   public SqlNode validate(final SqlNode parsedNode) {
     try {
-      SqlNode validatedNode = validator.validate(parsedNode);
-      return validatedNode;
+      return validator.validate(parsedNode);
     } catch (RuntimeException e) {
       UserException.Builder builder = UserException
-          .validationError(e)
-          .addContext("SQL Query", sql);
+          .validationError(e);
       if (isInnerQuery) {
         builder.message("Failure validating a view your query is dependent 
upon.");
       }
@@ -240,7 +238,7 @@ public void useRootSchemaAsDefault(boolean useRoot) {
 
   private class DrillValidator extends SqlValidatorImpl {
 
-    protected DrillValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader 
catalogReader,
+    DrillValidator(SqlOperatorTable opTab, SqlValidatorCatalogReader 
catalogReader,
         RelDataTypeFactory typeFactory, SqlConformance conformance) {
       super(opTab, catalogReader, typeFactory, conformance);
     }
@@ -382,15 +380,11 @@ public RelRoot toRel(final SqlNode validatedNode) {
 
     //To avoid unexpected column errors set a value of top to false
     final RelRoot rel = sqlToRelConverter.convertQuery(validatedNode, false, 
false);
-    final RelRoot rel2 = rel.withRel(sqlToRelConverter.flattenTypes(rel.rel, 
true));
-    return rel2;
+    return rel.withRel(sqlToRelConverter.flattenTypes(rel.rel, true));
   }
 
   private class Expander implements RelOptTable.ViewExpander {
 
-    public Expander() {
-    }
-
     @Override
     public RelRoot expandView(RelDataType rowType, String queryString, 
List<String> schemaPath, List<String> viewPath) {
       final DrillCalciteCatalogReader catalogReader = new 
DrillCalciteCatalogReader(
@@ -550,7 +544,7 @@ public RexNode ensureType(
      * also preserving nullability.
      *
      * <p>Tries to expand the cast, and therefore the result may be something
-     * other than a {@link RexCall} to the CAST operator, such as a
+     * other than a {@link org.apache.calcite.rex.RexCall} to the CAST 
operator, such as a
      * {@link RexLiteral} if {@code matchNullability} is false.
      *
      * @param type             Type to cast to
@@ -611,7 +605,7 @@ public RexNode makeCast(RelDataType type, RexNode exp, 
boolean matchNullability)
     /**
      * Disallow temporary tables presence in sql statement (ex: in view 
definitions)
      */
-    public void disallowTemporaryTables() {
+    void disallowTemporaryTables() {
       this.allowTemporaryTables = false;
     }
 
@@ -647,13 +641,19 @@ public void disallowTemporaryTables() {
         }
       }
 
-      return super.getTable(names);
+      Prepare.PreparingTable table = super.getTable(names);
+      DrillTable unwrap;
+      // add session options if found table is Drill table
+      if (table != null && (unwrap = table.unwrap(DrillTable.class)) != null) {
+        unwrap.setOptions(session.getOptions());
+      }
+      return table;
     }
 
     @Override
     public List<List<String>> getSchemaPaths() {
       if (useRootSchema) {
-        return ImmutableList.<List<String>>of(ImmutableList.<String>of());
+        return ImmutableList.of(ImmutableList.of());
       }
       return super.getSchemaPaths();
     }
@@ -662,8 +662,8 @@ public void disallowTemporaryTables() {
      * check if the schema provided is a valid schema:
      * <li>schema is not indicated (only one element in the names list)<li/>
      *
-     * @param names             list of schema and table names, table name is 
always the last element
-     * @return throws a userexception if the schema is not valid.
+     * @param names list of schema and table names, table name is always the 
last element
+     * @throws UserException if the schema is not valid.
      */
     private void isValidSchema(final List<String> names) throws UserException {
       SchemaPlus defaultSchema = session.getDefaultSchema(this.rootSchema);
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index a9c47428168..a16bb4d95cd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -17,11 +17,14 @@
  */
 package org.apache.drill.exec.server.options;
 
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.drill.common.config.DrillConfig;
@@ -40,15 +43,13 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * {@link OptionManager} that holds options within {@link 
org.apache.drill.exec.server.DrillbitContext}.
- * Only one instance of this class exists per drillbit. Options set at the 
system level affect the entire system and
- * persist between restarts.
- */
 
 /**
+ *  <p> {@link OptionManager} that holds options within {@link 
org.apache.drill.exec.server.DrillbitContext}.
+ *  Only one instance of this class exists per drillbit. Options set at the 
system level affect the entire system and
+ *  persist between restarts.
+ *  </p>
+ *
  *  <p> All the system options are externalized into conf file. While adding a 
new system option
  *  a validator should be added and the default value for the option should be 
set in
  *  the conf files(example : drill-module.conf) under the namespace 
drill.exec.options.
@@ -173,6 +174,7 @@
       new 
OptionDefinition(ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS_VALIDATOR),
       new 
OptionDefinition(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER_VALIDATOR),
       new 
OptionDefinition(ExecConstants.HIVE_OPTIMIZE_MAPRDB_JSON_SCAN_WITH_NATIVE_READER_VALIDATOR),
+      new OptionDefinition(ExecConstants.HIVE_CONF_PROPERTIES_VALIDATOR),
       new OptionDefinition(ExecConstants.SLICE_TARGET_OPTION),
       new OptionDefinition(ExecConstants.AFFINITY_FACTOR),
       new OptionDefinition(ExecConstants.MAX_WIDTH_GLOBAL),
@@ -237,11 +239,13 @@
       new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, 
new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
     };
 
-    final CaseInsensitiveMap<OptionDefinition> map = 
CaseInsensitiveMap.newHashMap();
+    CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions)
+      .collect(Collectors.toMap(
+        d -> d.getValidator().getOptionName(),
+        Function.identity(),
+        (o, n) -> n,
+        CaseInsensitiveMap::newHashMap));
 
-    for (final OptionDefinition definition: definitions) {
-      map.put(definition.getValidator().getOptionName(), definition);
-    }
 
     if (AssertionUtil.isAssertionsEnabled()) {
       map.put(ExecConstants.DRILLBIT_CONTROL_INJECTIONS, new 
OptionDefinition(ExecConstants.DRILLBIT_CONTROLS_VALIDATOR));
@@ -295,7 +299,7 @@ public SystemOptionManager(final DrillConfig bootConfig) {
    * Initializes this option manager.
    *
    * @return this option manager
-   * @throws Exception
+   * @throws Exception if unable to initialize option manager
    */
   public SystemOptionManager init() throws Exception {
     options = provider.getOrCreateStore(config);
@@ -395,16 +399,13 @@ public void deleteLocalOption(final String name) {
 
   @Override
   public void deleteAllLocalOptions() {
-    final Set<String> names = Sets.newHashSet();
-    for (final Map.Entry<String, PersistedOptionValue> entry : 
Lists.newArrayList(options.getAll())) {
-      names.add(entry.getKey());
-    }
-    for (final String name : names) {
-      options.delete(name); // should be lowercase
-    }
+    Iterable<Map.Entry<String, PersistedOptionValue>> allOptions = () -> 
options.getAll();
+    StreamSupport.stream(allOptions.spliterator(), false)
+      .map(Entry::getKey)
+      .forEach(name -> options.delete(name)); // should be lowercase
   }
 
-  public static CaseInsensitiveMap<OptionValue> 
populateDefaultValues(Map<String, OptionDefinition> definitions, DrillConfig 
bootConfig) {
+  private CaseInsensitiveMap<OptionValue> populateDefaultValues(Map<String, 
OptionDefinition> definitions, DrillConfig bootConfig) {
     // populate the options from the config
     final Map<String, OptionValue> defaults = new HashMap<>();
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index de2b2c3b7ab..d37a0a2121d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -30,6 +30,7 @@
 
 import com.google.common.collect.ImmutableSet;
 import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 
 /** Abstract class for StorePlugin implementations.
  * See StoragePlugin for description of the interface intent and its methods.
@@ -102,11 +103,22 @@ public boolean supportsWrite() {
     }
   }
 
+
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions 
selection, SessionOptionManager options) throws IOException {
+    return getPhysicalScan(userName, selection);
+  }
+
   @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions 
selection) throws IOException {
     return getPhysicalScan(userName, selection, AbstractGroupScan.ALL_COLUMNS);
   }
 
+  @Override
+  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions 
selection, List<SchemaPath> columns, SessionOptionManager options) throws 
IOException {
+    return getPhysicalScan(userName, selection, columns);
+  }
+
   @Override
   public AbstractGroupScan getPhysicalScan(String userName, JSONOptions 
selection, List<SchemaPath> columns) throws IOException {
     throw new UnsupportedOperationException();
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index 50f2731021e..2617065ad05 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -27,6 +27,7 @@
 import org.apache.drill.common.logical.StoragePluginConfig;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
 import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.server.options.SessionOptionManager;
 
 /** Interface for all implementations of the storage plugins. Different 
implementations of the storage
  * formats will implement methods that indicate if Drill can write or read its 
tables from that format,
@@ -36,18 +37,18 @@
 
   /** Indicates if Drill can read the table from this format.
   */
-  public boolean supportsRead();
+  boolean supportsRead();
 
   /** Indicates if Drill can write a table to this format (e.g. as JSON, csv, 
etc.).
    */
-  public boolean supportsWrite();
+  boolean supportsWrite();
 
   /** An implementation of this method will return one or more specialized 
rules that Drill query
    *  optimizer can leverage in <i>physical</i> space. Otherwise, it should 
return an empty set.
    * @return an empty set or a set of plugin specific physical optimizer rules.
    */
   @Deprecated
-  public Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext 
optimizerContext);
+  Set<? extends RelOptRule> getOptimizerRules(OptimizerRulesContext 
optimizerContext);
 
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
@@ -55,9 +56,18 @@
    * @param userName User whom to impersonate when when reading the contents 
as part of Scan.
    * @param selection The configured storage engine specific selection.
    * @return The physical scan operator for the particular GroupScan (read) 
node.
-   * @throws IOException
    */
-  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions 
selection) throws IOException;
+  AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection) 
throws IOException;
+
+  /**
+   * Get the physical scan operator for the particular GroupScan (read) node.
+   *
+   * @param userName User whom to impersonate when when reading the contents 
as part of Scan.
+   * @param selection The configured storage engine specific selection.
+   * @param options (optional) session options
+   * @return The physical scan operator for the particular GroupScan (read) 
node.
+   */
+  AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, 
SessionOptionManager options) throws IOException;
 
   /**
    * Get the physical scan operator for the particular GroupScan (read) node.
@@ -66,18 +76,29 @@
    * @param selection The configured storage engine specific selection.
    * @param columns (optional) The list of column names to scan from the data 
source.
    * @return The physical scan operator for the particular GroupScan (read) 
node.
-   * @throws IOException
   */
-  public AbstractGroupScan getPhysicalScan(String userName, JSONOptions 
selection, List<SchemaPath> columns)
-      throws IOException;
+  AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, 
List<SchemaPath> columns) throws IOException;
 
-  /** Method returns a Jackson serializable object that extends a 
StoragePluginConfig
-  * @return an extension of StoragePluginConfig
+  /**
+   * Get the physical scan operator for the particular GroupScan (read) node.
+   *
+   * @param userName User whom to impersonate when when reading the contents 
as part of Scan.
+   * @param selection The configured storage engine specific selection.
+   * @param columns (optional) The list of column names to scan from the data 
source.
+   * @param options (optional) session options
+   * @return The physical scan operator for the particular GroupScan (read) 
node.
+   */
+  AbstractGroupScan getPhysicalScan(String userName, JSONOptions selection, 
List<SchemaPath> columns, SessionOptionManager options) throws IOException;
+
+  /**
+   * Method returns a Jackson serializable object that extends a 
StoragePluginConfig.
+   *
+   * @return an extension of StoragePluginConfig
   */
-  public StoragePluginConfig getConfig();
+  StoragePluginConfig getConfig();
 
   /**
    * Initialize the storage plugin. The storage plugin will not be used until 
this method is called.
    */
-  public void start() throws IOException;
+  void start() throws IOException;
 }
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index 4b1e9ddf3f1..b0cc209cd0a 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -555,6 +555,10 @@ drill.exec.options: {
     store.hive.optimize_scan_with_native_readers: false,
     store.hive.parquet.optimize_scan_with_native_reader: false,
     store.hive.maprdb_json.optimize_scan_with_native_reader: false,
+    # Properties values should NOT be set in double-quotes or any other quotes.
+    # Property name and value should be separated by =.
+    # Properties should be separated by new line (\n).
+    store.hive.conf.properties: "",
     store.json.all_text_mode: false,
     store.json.writer.allow_nan_inf: true,
     store.json.reader.allow_nan_inf: true,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add store.hive.conf.properties option to allow set Hive properties at session 
> level
> -----------------------------------------------------------------------------------
>
>                 Key: DRILL-6575
>                 URL: https://issues.apache.org/jira/browse/DRILL-6575
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.13.0
>            Reporter: Arina Ielchiieva
>            Assignee: Arina Ielchiieva
>            Priority: Major
>              Labels: doc-impacting, ready-to-commit
>             Fix For: 1.14.0
>
>
> *Use case*
> Hive external table ddl:
> {noformat}
> create external table my(key int, val string)
> row format delimited
> fields terminated by ','
> stored as textfile
> location '/data/my_tbl';
> {noformat}
> Path {{/data/my_tb}} contains sub directory and file in it: 
> {{/data/my_tbl/sub_dir/data.txt}} with the following data:
> {noformat}
> 1, value_1
> 2, value_2
> {noformat}
> When querying such table from Hive, user gets the following exception:
> {noformat}
> Failed with exception java.io.IOException:java.io.IOException: Not a file: 
> file:///data/my_tbl/sub_dir
> {noformat}
> To be able to query this table user needs to set two properties to true: 
> {{hive.mapred.supports.subdirectories}} and {{mapred.input.dir.recursive}}.
>  They can be set at system level in hive-site.xml or at session in Hive 
> console:
> {noformat}
> set hive.mapred.supports.subdirectories=true;
> set mapred.input.dir.recursive=true;
> {noformat}
> Currently to be able to query such table from Drill, user can specify this 
> properties in Hive plugin only:
> {noformat}
> {
>   "type": "hive",
>   "configProps": {
>     "hive.metastore.uris": "thrift://localhost:9083",
>     "hive.metastore.sasl.enabled": "false",
>     "hbase.zookeeper.quorum": "localhost",
>     "hbase.zookeeper.property.clientPort": "5181",
>     "hive.mapred.supports.subdirectories": "true",
>     "mapred.input.dir.recursive": "true"
>   }
>   "enabled": true
> }
> {noformat}
> *Jira scope*
>  This Jira aims to add new session option to Drill 
> {{store.hive.conf.properties}} which will allow user to specify hive 
> properties at session level. 
>  User should write properties in string delimited by new line symbol. 
> Properties values should NOT be set in double-quotes or any other quotes 
> otherwise they would be parsed incorrectly. Property name and value should be 
> separated by {{=}}. Each `alter session set` will override previously set 
> properties at session level. If during query Drill couldn't unparse property 
> string, warning will be logged. Properties will be parsed by loading into 
> {{java.util.Properties}}. Default value is empty string ("").
> Example:
> {noformat}
> alter session set `store.hive.conf.properties` = 
> 'hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true';
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to