DRILL-1172: Consider Hive override config given in storage plugin for reading data in Hive tables
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0d328d5a Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0d328d5a Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0d328d5a Branch: refs/heads/master Commit: 0d328d5ad91fef6dd9f93f95d324a6e74193c2c6 Parents: d2d047a Author: vkorukanti <venki.koruka...@gmail.com> Authored: Mon Jul 28 19:00:33 2014 -0700 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Tue Jul 29 15:54:32 2014 -0700 ---------------------------------------------------------------------- .../apache/drill/exec/store/hive/HiveReadEntry.java | 8 +++++++- .../drill/exec/store/hive/HiveRecordReader.java | 7 ++++++- .../org/apache/drill/exec/store/hive/HiveScan.java | 6 ++++++ .../drill/exec/store/hive/HiveScanBatchCreator.java | 5 +++-- .../drill/exec/store/hive/HiveStoragePlugin.java | 2 +- .../exec/store/hive/HiveStoragePluginConfig.java | 15 ++------------- .../drill/exec/store/hive/HiveTextRecordReader.java | 2 +- .../exec/store/hive/schema/HiveSchemaFactory.java | 14 ++++++++++++-- 8 files changed, 38 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java index 32f793e..13845ae 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveReadEntry.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.hive; import java.util.List; +import java.util.Map; import net.hydromatic.optiq.Schema.TableType; @@ -35,12 +36,16 @@ public class HiveReadEntry { public HiveTable table; @JsonProperty("partitions") public List<HiveTable.HivePartition> partitions; + @JsonProperty("hiveConfigOverride") + public Map<String, String> hiveConfigOverride; @JsonIgnore private List<Partition> partitionsUnwrapped = Lists.newArrayList(); @JsonCreator - public HiveReadEntry(@JsonProperty("table") HiveTable table, @JsonProperty("partitions") List<HiveTable.HivePartition> partitions) { + public HiveReadEntry(@JsonProperty("table") HiveTable table, + @JsonProperty("partitions") List<HiveTable.HivePartition> partitions, + @JsonProperty("hiveConfigOverride") Map<String, String> hiveConfigOverride) { this.table = table; this.partitions = partitions; if (partitions != null) { @@ -48,6 +53,7 @@ public class HiveReadEntry { partitionsUnwrapped.add(part.getPartition()); } } + this.hiveConfigOverride = hiveConfigOverride; } @JsonIgnore http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java index 8a84ac2..ac0fe36 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java @@ -100,18 +100,20 @@ public class HiveRecordReader implements RecordReader { protected List<ValueVector> pVectors = Lists.newArrayList(); protected Object redoRecord; protected boolean empty; + private Map<String, String> hiveConfigOverride; protected static final int TARGET_RECORD_COUNT = 4000; protected static final int FIELD_SIZE = 50; public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns, - FragmentContext context) throws ExecutionSetupException { + FragmentContext context, Map<String, String> hiveConfigOverride) throws ExecutionSetupException { this.table = table; this.partition = partition; this.inputSplit = inputSplit; this.context = context; this.projectedColumns = projectedColumns; this.empty = (inputSplit == null && partition == null); + this.hiveConfigOverride = hiveConfigOverride; init(); } @@ -135,6 +137,9 @@ public class HiveRecordReader implements RecordReader { for (Object obj : properties.keySet()) { job.set((String) obj, (String) properties.get(obj)); } + for(Map.Entry<String, String> entry : hiveConfigOverride.entrySet()) { + job.set(entry.getKey(), entry.getValue()); + } InputFormat format; String sLib = (partition == null) ? table.getSd().getSerdeInfo().getSerializationLib() : partition.getSd().getSerdeInfo().getSerializationLib(); String inputFormatName = (partition == null) ? table.getSd().getInputFormat() : partition.getSd().getInputFormat(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java index 61b6c7f..451a5c3 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -138,6 +138,9 @@ public class HiveScan extends AbstractGroupScan { for (Object obj : properties.keySet()) { job.set((String) obj, (String) properties.get(obj)); } + for(Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) { + job.set(entry.getKey(), entry.getValue()); + } InputFormat<?, ?> format = (InputFormat<?, ?>) Class.forName(table.getSd().getInputFormat()).getConstructor().newInstance(); job.setInputFormat(format.getClass()); Path path = new Path(table.getSd().getLocation()); @@ -159,6 +162,9 @@ public class HiveScan extends AbstractGroupScan { for (Object obj : properties.keySet()) { job.set((String) obj, (String) properties.get(obj)); } + for(Map.Entry<String, String> entry : hiveReadEntry.hiveConfigOverride.entrySet()) { + job.set(entry.getKey(), entry.getValue()); + } InputFormat<?, ?> format = (InputFormat<?, ?>) Class.forName(partition.getSd().getInputFormat()).getConstructor().newInstance(); job.setInputFormat(format.getClass()); Path path = new Path(partition.getSd().getLocation()); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java index 6e540ad..616fae3 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java @@ -57,14 +57,15 @@ public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> { for (InputSplit split : splits) { readers.add(new HiveRecordReader(table, (hasPartitions ? partitions.get(i++) : null), - split, config.getColumns(), context)); + split, config.getColumns(), context, config.getHiveReadEntry().hiveConfigOverride)); } //} // If there are no readers created (which is possible when the table is empty), create an empty RecordReader to // output the schema if (readers.size() == 0) { - readers.add(new HiveRecordReader(table, null, null, config.getColumns(), context)); + readers.add(new HiveRecordReader(table, null, null, config.getColumns(), context, + config.getHiveReadEntry().hiveConfigOverride)); } return new ScanBatch(config, context, readers.iterator()); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java index da8a41a..c0ac4ba 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java @@ -48,7 +48,7 @@ public class HiveStoragePlugin extends AbstractStoragePlugin { public HiveStoragePlugin(HiveStoragePluginConfig config, DrillbitContext context, String name) throws ExecutionSetupException { this.config = config; this.context = context; - this.schemaFactory = new HiveSchemaFactory(this, name, config.getHiveConf()); + this.schemaFactory = new HiveSchemaFactory(this, name, config.getHiveConfigOverride()); this.name = name; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java index cbd7906..55fa304 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePluginConfig.java @@ -31,23 +31,12 @@ import java.util.Map; public class HiveStoragePluginConfig extends StoragePluginConfigBase { @JsonProperty public Map<String, String> configProps; - @JsonIgnore - private HiveConf hiveConf; public static final String NAME = "hive"; @JsonIgnore - public HiveConf getHiveConf() { - if (hiveConf == null) { - hiveConf = new HiveConf(); - if (configProps != null) { - for (Map.Entry<String, String> entry : configProps.entrySet()) { - hiveConf.set(entry.getKey(), entry.getValue()); - } - } - } - - return hiveConf; + public Map<String, String> getHiveConfigOverride() { + return configProps; } @JsonCreator http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java index 116603c..05ee59f 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java @@ -48,7 +48,7 @@ public class HiveTextRecordReader extends HiveRecordReader { private final int numCols; public HiveTextRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> projectedColumns, FragmentContext context) throws ExecutionSetupException { - super(table, partition, inputSplit, projectedColumns, context); + super(table, partition, inputSplit, projectedColumns, context, null); String d = table.getSd().getSerdeInfo().getParameters().get("field.delim"); if (d != null) { delimiter = d.getBytes()[0]; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d328d5a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java index 2f0b267..ef2ac83 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.store.hive.schema; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -63,11 +64,20 @@ public class HiveSchemaFactory implements SchemaFactory { private LoadingCache<String, LoadingCache<String, HiveReadEntry>> tableLoaders; private HiveStoragePlugin plugin; private final String schemaName; + private final Map<String, String> hiveConfigOverride; - public HiveSchemaFactory(HiveStoragePlugin plugin, String name, HiveConf hiveConf) throws ExecutionSetupException { + public HiveSchemaFactory(HiveStoragePlugin plugin, String name, Map<String, String> hiveConfigOverride) throws ExecutionSetupException { this.schemaName = name; this.plugin = plugin; + this.hiveConfigOverride = hiveConfigOverride; + HiveConf hiveConf = new HiveConf(); + if (hiveConfigOverride != null) { + for (Map.Entry<String, String> entry : hiveConfigOverride.entrySet()) { + hiveConf.set(entry.getKey(), entry.getValue()); + } + } + try { this.mClient = new HiveMetaStoreClient(hiveConf); } catch (MetaException e) { @@ -168,7 +178,7 @@ public class HiveSchemaFactory implements SchemaFactory { if (hivePartitions.size() == 0) hivePartitions = null; - return new HiveReadEntry(new HiveTable(t), hivePartitions); + return new HiveReadEntry(new HiveTable(t), hivePartitions, hiveConfigOverride); }