DRILL-1172: Consider Hive override config given in storage plugin for reading 
data in Hive tables


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0d328d5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0d328d5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0d328d5a

Branch: refs/heads/master
Commit: 0d328d5ad91fef6dd9f93f95d324a6e74193c2c6
Parents: d2d047a
Author: vkorukanti <venki.koruka...@gmail.com>
Authored: Mon Jul 28 19:00:33 2014 -0700
Committer: Jacques Nadeau <jacq...@apache.org>
Committed: Tue Jul 29 15:54:32 2014 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/store/hive/HiveReadEntry.java  |  8 +++++++-
 .../drill/exec/store/hive/HiveRecordReader.java      |  7 ++++++-
 .../org/apache/drill/exec/store/hive/HiveScan.java   |  6 ++++++
 .../drill/exec/store/hive/HiveScanBatchCreator.java  |  5 +++--
 .../drill/exec/store/hive/HiveStoragePlugin.java     |  2 +-
 .../exec/store/hive/HiveStoragePluginConfig.java     | 15 ++-------------
 .../drill/exec/store/hive/HiveTextRecordReader.java  |  2 +-
 .../exec/store/hive/schema/HiveSchemaFactory.java    | 14 ++++++++++++--
 8 files changed, 38 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
index 32f793e..13845ae 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.hive;
 
 import java.util.List;
+import java.util.Map;
 
 import net.hydromatic.optiq.Schema.TableType;
 
@@ -35,12 +36,16 @@ public class HiveReadEntry {
   public HiveTable table;
   @JsonProperty("partitions")
   public List<HiveTable.HivePartition> partitions;
+  @JsonProperty("hiveConfigOverride")
+  public Map<String, String> hiveConfigOverride;
 
   @JsonIgnore
   private List<Partition> partitionsUnwrapped = Lists.newArrayList();
 
   @JsonCreator
-  public HiveReadEntry(@JsonProperty("table") HiveTable table, 
@JsonProperty("partitions") List<HiveTable.HivePartition> partitions) {
+  public HiveReadEntry(@JsonProperty("table") HiveTable table,
+                       @JsonProperty("partitions") 
List<HiveTable.HivePartition> partitions,
+                       @JsonProperty("hiveConfigOverride") Map<String, String> 
hiveConfigOverride) {
     this.table = table;
     this.partitions = partitions;
     if (partitions != null) {
@@ -48,6 +53,7 @@ public class HiveReadEntry {
         partitionsUnwrapped.add(part.getPartition());
       }
     }
+    this.hiveConfigOverride = hiveConfigOverride;
   }
 
   @JsonIgnore

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index 8a84ac2..ac0fe36 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -100,18 +100,20 @@ public class HiveRecordReader implements RecordReader {
   protected List<ValueVector> pVectors = Lists.newArrayList();
   protected Object redoRecord;
   protected boolean empty;
+  private Map<String, String> hiveConfigOverride;
 
   protected static final int TARGET_RECORD_COUNT = 4000;
   protected static final int FIELD_SIZE = 50;
 
   public HiveRecordReader(Table table, Partition partition, InputSplit 
inputSplit, List<SchemaPath> projectedColumns,
-      FragmentContext context) throws ExecutionSetupException {
+      FragmentContext context, Map<String, String> hiveConfigOverride) throws 
ExecutionSetupException {
     this.table = table;
     this.partition = partition;
     this.inputSplit = inputSplit;
     this.context = context;
     this.projectedColumns = projectedColumns;
     this.empty = (inputSplit == null && partition == null);
+    this.hiveConfigOverride = hiveConfigOverride;
     init();
   }
 
@@ -135,6 +137,9 @@ public class HiveRecordReader implements RecordReader {
     for (Object obj : properties.keySet()) {
       job.set((String) obj, (String) properties.get(obj));
     }
+    for(Map.Entry<String, String> entry : hiveConfigOverride.entrySet()) {
+      job.set(entry.getKey(), entry.getValue());
+    }
     InputFormat format;
     String sLib = (partition == null) ? 
table.getSd().getSerdeInfo().getSerializationLib() : 
partition.getSd().getSerdeInfo().getSerializationLib();
     String inputFormatName = (partition == null) ? 
table.getSd().getInputFormat() : partition.getSd().getInputFormat();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 61b6c7f..451a5c3 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -138,6 +138,9 @@ public class HiveScan extends AbstractGroupScan {
         for (Object obj : properties.keySet()) {
           job.set((String) obj, (String) properties.get(obj));
         }
+        for(Map.Entry<String, String> entry : 
hiveReadEntry.hiveConfigOverride.entrySet()) {
+          job.set(entry.getKey(), entry.getValue());
+        }
         InputFormat<?, ?> format = (InputFormat<?, ?>) 
Class.forName(table.getSd().getInputFormat()).getConstructor().newInstance();
         job.setInputFormat(format.getClass());
         Path path = new Path(table.getSd().getLocation());
@@ -159,6 +162,9 @@ public class HiveScan extends AbstractGroupScan {
           for (Object obj : properties.keySet()) {
             job.set((String) obj, (String) properties.get(obj));
           }
+          for(Map.Entry<String, String> entry : 
hiveReadEntry.hiveConfigOverride.entrySet()) {
+            job.set(entry.getKey(), entry.getValue());
+          }
           InputFormat<?, ?> format = (InputFormat<?, ?>) 
Class.forName(partition.getSd().getInputFormat()).getConstructor().newInstance();
           job.setInputFormat(format.getClass());
           Path path = new Path(partition.getSd().getLocation());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
index 6e540ad..616fae3 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -57,14 +57,15 @@ public class HiveScanBatchCreator implements 
BatchCreator<HiveSubScan> {
       for (InputSplit split : splits) {
         readers.add(new HiveRecordReader(table,
             (hasPartitions ? partitions.get(i++) : null),
-            split, config.getColumns(), context));
+            split, config.getColumns(), context, 
config.getHiveReadEntry().hiveConfigOverride));
       }
     //}
 
     // If there are no readers created (which is possible when the table is 
empty), create an empty RecordReader to
     // output the schema
     if (readers.size() == 0) {
-      readers.add(new HiveRecordReader(table, null, null, config.getColumns(), 
context));
+      readers.add(new HiveRecordReader(table, null, null, config.getColumns(), 
context,
+          config.getHiveReadEntry().hiveConfigOverride));
     }
 
     return new ScanBatch(config, context, readers.iterator());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index da8a41a..c0ac4ba 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -48,7 +48,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
   public HiveStoragePlugin(HiveStoragePluginConfig config, DrillbitContext 
context, String name) throws ExecutionSetupException {
     this.config = config;
     this.context = context;
-    this.schemaFactory = new HiveSchemaFactory(this, name, 
config.getHiveConf());
+    this.schemaFactory = new HiveSchemaFactory(this, name, 
config.getHiveConfigOverride());
     this.name = name;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
index cbd7906..55fa304 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java
@@ -31,23 +31,12 @@ import java.util.Map;
 public class HiveStoragePluginConfig extends StoragePluginConfigBase {
   @JsonProperty
   public Map<String, String> configProps;
-  @JsonIgnore
-  private HiveConf hiveConf;
 
   public static final String NAME = "hive";
 
   @JsonIgnore
-  public HiveConf getHiveConf() {
-    if (hiveConf == null) {
-      hiveConf = new HiveConf();
-      if (configProps != null) {
-        for (Map.Entry<String, String> entry : configProps.entrySet()) {
-          hiveConf.set(entry.getKey(), entry.getValue());
-        }
-      }
-    }
-
-    return hiveConf;
+  public Map<String, String> getHiveConfigOverride() {
+    return configProps;
   }
 
   @JsonCreator

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
index 116603c..05ee59f 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
@@ -48,7 +48,7 @@ public class HiveTextRecordReader extends HiveRecordReader {
   private final int numCols;
 
   public HiveTextRecordReader(Table table, Partition partition, InputSplit 
inputSplit, List<SchemaPath> projectedColumns, FragmentContext context) throws 
ExecutionSetupException {
-    super(table, partition, inputSplit, projectedColumns, context);
+    super(table, partition, inputSplit, projectedColumns, context, null);
     String d = table.getSd().getSerdeInfo().getParameters().get("field.delim");
     if (d != null) {
       delimiter = d.getBytes()[0];

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
index 2f0b267..ef2ac83 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.hive.schema;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -63,11 +64,20 @@ public class HiveSchemaFactory implements SchemaFactory {
   private LoadingCache<String, LoadingCache<String, HiveReadEntry>> 
tableLoaders;
   private HiveStoragePlugin plugin;
   private final String schemaName;
+  private final Map<String, String> hiveConfigOverride;
 
-  public HiveSchemaFactory(HiveStoragePlugin plugin, String name, HiveConf 
hiveConf) throws ExecutionSetupException {
+  public HiveSchemaFactory(HiveStoragePlugin plugin, String name, Map<String, 
String> hiveConfigOverride) throws ExecutionSetupException {
     this.schemaName = name;
     this.plugin = plugin;
 
+    this.hiveConfigOverride = hiveConfigOverride;
+    HiveConf hiveConf = new HiveConf();
+    if (hiveConfigOverride != null) {
+      for (Map.Entry<String, String> entry : hiveConfigOverride.entrySet()) {
+        hiveConf.set(entry.getKey(), entry.getValue());
+      }
+    }
+
     try {
       this.mClient = new HiveMetaStoreClient(hiveConf);
     } catch (MetaException e) {
@@ -168,7 +178,7 @@ public class HiveSchemaFactory implements SchemaFactory {
 
       if (hivePartitions.size() == 0)
         hivePartitions = null;
-      return new HiveReadEntry(new HiveTable(t), hivePartitions);
+      return new HiveReadEntry(new HiveTable(t), hivePartitions, 
hiveConfigOverride);
 
     }
 

Reply via email to