HIVE-11981: ORC Schema Evolution Issues (Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/249c4ef1 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/249c4ef1 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/249c4ef1 Branch: refs/heads/master Commit: 249c4ef1377860eb6ec9c3aedb5c1f930479a6ec Parents: 7f65e36 Author: Matt McCline <[email protected]> Authored: Wed Nov 18 05:11:35 2015 -0800 Committer: Matt McCline <[email protected]> Committed: Wed Nov 18 05:11:35 2015 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 + .../mapreduce/FosterStorageHandler.java | 37 + .../hive/hcatalog/mapreduce/InputJobInfo.java | 8 +- .../hive/hcatalog/streaming/TestStreaming.java | 2 + .../streaming/mutate/StreamingAssert.java | 2 + .../hive/ql/txn/compactor/TestCompactor.java | 246 ++- .../test/resources/testconfiguration.properties | 16 + .../hive/llap/io/api/impl/LlapInputFormat.java | 40 +- .../org/apache/hadoop/hive/ql/ErrorMsg.java | 7 +- .../hadoop/hive/ql/exec/FetchOperator.java | 7 + .../apache/hadoop/hive/ql/exec/MapOperator.java | 22 +- .../hadoop/hive/ql/exec/TableScanOperator.java | 20 + .../apache/hadoop/hive/ql/exec/Utilities.java | 104 +- .../ql/exec/spark/SparkReduceRecordHandler.java | 11 +- .../hive/ql/exec/tez/ReduceRecordProcessor.java | 2 +- .../hive/ql/exec/tez/ReduceRecordSource.java | 18 +- .../hive/ql/exec/vector/VectorExtractRow.java | 9 +- .../ql/exec/vector/VectorGroupByOperator.java | 2 +- .../exec/vector/VectorMapJoinBaseOperator.java | 2 +- .../exec/vector/VectorSMBMapJoinOperator.java | 2 +- .../ql/exec/vector/VectorizationContext.java | 35 +- .../ql/exec/vector/VectorizedBatchUtil.java | 204 +- .../ql/exec/vector/VectorizedColumnarSerDe.java | 277 --- .../ql/exec/vector/VectorizedRowBatchCtx.java | 509 ++--- .../mapjoin/VectorMapJoinCommonOperator.java | 66 +- .../VectorMapJoinGenerateResultOperator.java | 5 +- .../hadoop/hive/ql/io/HiveInputFormat.java | 5 + .../apache/hadoop/hive/ql/io/IOConstants.java | 11 + .../io/SelfDescribingInputFormatInterface.java | 27 + .../hive/ql/io/VectorizedRCFileInputFormat.java | 81 - .../ql/io/VectorizedRCFileRecordReader.java | 261 --- .../ql/io/orc/ConversionTreeReaderFactory.java | 38 - .../apache/hadoop/hive/ql/io/orc/OrcFile.java | 2 +- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 60 +- .../hadoop/hive/ql/io/orc/OrcOutputFormat.java | 84 +- .../hive/ql/io/orc/OrcRawRecordMerger.java | 61 +- .../apache/hadoop/hive/ql/io/orc/OrcUtils.java | 547 ++++++ .../apache/hadoop/hive/ql/io/orc/Reader.java | 15 + .../hive/ql/io/orc/RecordReaderFactory.java | 274 --- .../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 17 +- .../hadoop/hive/ql/io/orc/SchemaEvolution.java | 185 ++ .../hive/ql/io/orc/TreeReaderFactory.java | 182 +- .../ql/io/orc/VectorizedOrcAcidRowReader.java | 45 +- .../ql/io/orc/VectorizedOrcInputFormat.java | 45 +- .../parquet/VectorizedParquetInputFormat.java | 25 +- .../hive/ql/optimizer/GenMapRedUtils.java | 18 + .../hive/ql/optimizer/SimpleFetchOptimizer.java | 1 + .../hive/ql/optimizer/physical/Vectorizer.java | 502 +++-- .../ql/optimizer/physical/Vectorizer.java.orig | 1744 ------------------ .../apache/hadoop/hive/ql/plan/BaseWork.java | 30 +- .../hadoop/hive/ql/plan/PartitionDesc.java | 15 +- .../hive/ql/plan/VectorPartitionConversion.java | 166 ++ .../hive/ql/plan/VectorPartitionDesc.java | 110 ++ .../ql/exec/vector/TestVectorRowObject.java | 5 +- .../hive/ql/exec/vector/TestVectorSerDeRow.java | 10 +- .../exec/vector/TestVectorizedRowBatchCtx.java | 357 ---- .../hive/ql/io/orc/TestInputOutputFormat.java | 34 +- .../hive/ql/io/orc/TestOrcRawRecordMerger.java | 42 + .../ql/io/orc/TestOrcRawRecordMerger.java.orig | 1150 ++++++++++++ .../schema_evol_orc_acid_mapwork_part.q | 171 ++ .../schema_evol_orc_acid_mapwork_table.q | 129 ++ .../schema_evol_orc_acidvec_mapwork_part.q | 171 ++ .../schema_evol_orc_acidvec_mapwork_table.q | 129 ++ .../schema_evol_orc_nonvec_fetchwork_part.q | 96 + .../schema_evol_orc_nonvec_fetchwork_table.q | 56 + .../schema_evol_orc_nonvec_mapwork_part.q | 96 + .../schema_evol_orc_nonvec_mapwork_table.q | 56 + .../schema_evol_orc_vec_mapwork_part.q | 96 + .../schema_evol_orc_vec_mapwork_table.q | 56 + .../schema_evol_text_fetchwork_table.q | 56 + .../schema_evol_text_mapwork_table.q | 56 + .../schema_evol_text_nonvec_fetchwork_part.q | 96 + .../schema_evol_text_nonvec_fetchwork_table.q | 66 + .../schema_evol_text_nonvec_mapwork_part.q | 96 + .../schema_evol_text_nonvec_mapwork_table.q | 66 + .../schema_evol_orc_acid_mapwork_part.q.out | 1035 +++++++++++ .../schema_evol_orc_acid_mapwork_table.q.out | 649 +++++++ .../schema_evol_orc_acidvec_mapwork_part.q.out | 1035 +++++++++++ .../schema_evol_orc_acidvec_mapwork_table.q.out | 649 +++++++ .../schema_evol_orc_nonvec_fetchwork_part.q.out | 642 +++++++ ...schema_evol_orc_nonvec_fetchwork_table.q.out | 298 +++ .../schema_evol_orc_nonvec_mapwork_part.q.out | 642 +++++++ .../schema_evol_orc_nonvec_mapwork_table.q.out | 298 +++ .../schema_evol_orc_vec_mapwork_part.q.out | 642 +++++++ .../schema_evol_orc_vec_mapwork_table.q.out | 298 +++ .../schema_evol_text_fetchwork_table.q.out | 298 +++ .../schema_evol_text_mapwork_table.q.out | 298 +++ ...schema_evol_text_nonvec_fetchwork_part.q.out | 642 +++++++ ...chema_evol_text_nonvec_fetchwork_table.q.out | 297 +++ .../schema_evol_text_nonvec_mapwork_part.q.out | 642 +++++++ .../schema_evol_text_nonvec_mapwork_table.q.out | 297 +++ .../tez/schema_evol_orc_acid_mapwork_part.q.out | 1035 +++++++++++ .../schema_evol_orc_acid_mapwork_table.q.out | 649 +++++++ .../schema_evol_orc_acidvec_mapwork_part.q.out | 1035 +++++++++++ .../schema_evol_orc_acidvec_mapwork_table.q.out | 649 +++++++ .../schema_evol_orc_nonvec_fetchwork_part.q.out | 642 +++++++ ...schema_evol_orc_nonvec_fetchwork_table.q.out | 298 +++ .../schema_evol_orc_nonvec_mapwork_part.q.out | 642 +++++++ .../schema_evol_orc_nonvec_mapwork_table.q.out | 298 +++ .../tez/schema_evol_orc_vec_mapwork_part.q.out | 642 +++++++ .../tez/schema_evol_orc_vec_mapwork_table.q.out | 298 +++ .../tez/schema_evol_text_fetchwork_table.q.out | 298 +++ .../tez/schema_evol_text_mapwork_table.q.out | 298 +++ ...schema_evol_text_nonvec_fetchwork_part.q.out | 642 +++++++ ...chema_evol_text_nonvec_fetchwork_table.q.out | 297 +++ .../schema_evol_text_nonvec_mapwork_part.q.out | 642 +++++++ .../schema_evol_text_nonvec_mapwork_table.q.out | 297 +++ .../tez/vector_partition_diff_num_cols.q.out | 1 + .../vector_partition_diff_num_cols.q.out | 1 + .../hive/serde2/typeinfo/TypeInfoUtils.java | 36 + .../hive/ql/exec/vector/VectorizedRowBatch.java | 20 + 111 files changed, 22617 insertions(+), 4063 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 953e52c..7cab9ae 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -969,6 +969,10 @@ public class HiveConf extends Configuration { "The threshold for the input file size of the small tables; if the file size is smaller \n" + "than this threshold, it will try to convert the common join into map join"), + + HIVE_SCHEMA_EVOLUTION("hive.exec.schema.evolution", true, + "Use schema evolution to convert self-describing file format's data to the schema desired by the reader."), + HIVESAMPLERANDOMNUM("hive.sample.seednumber", 0, "A number used to percentage sampling. By changing this number, user will change the subsets of data sampled."), http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index 5a95467..bc56d77 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -35,6 +36,10 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -48,6 +53,8 @@ import java.util.Map; */ public class FosterStorageHandler extends DefaultStorageHandler { + private static final Logger LOG = LoggerFactory.getLogger(FosterStorageHandler.class); + public Configuration conf; /** The directory under which data is initially written for a non partitioned table */ protected static final String TEMP_DIR_NAME = "_TEMP"; @@ -98,6 +105,36 @@ public class FosterStorageHandler extends DefaultStorageHandler { @Override public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) { + + try { + Map<String, String> tableProperties = tableDesc.getJobProperties(); + + String jobInfoProperty = tableProperties.get(HCatConstants.HCAT_KEY_JOB_INFO); + if (jobInfoProperty != null) { + + InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobInfoProperty); + + HCatTableInfo tableInfo = inputJobInfo.getTableInfo(); + HCatSchema dataColumns = tableInfo.getDataColumns(); + List<HCatFieldSchema> dataFields = dataColumns.getFields(); + StringBuilder columnNamesSb = new StringBuilder(); + StringBuilder typeNamesSb = new StringBuilder(); + for (HCatFieldSchema dataField : dataFields) { + if (columnNamesSb.length() > 0) { + columnNamesSb.append(","); + typeNamesSb.append(":"); + } + columnNamesSb.append(dataField.getName()); + typeNamesSb.append(dataField.getTypeString()); + } + jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesSb.toString()); + jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, typeNamesSb.toString()); + + } + } catch (IOException e) { + throw new IllegalStateException("Failed to set output path", e); + } + } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java index 1f23f3f..7ec6ae3 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java @@ -182,9 +182,11 @@ public class InputJobInfo implements Serializable { ObjectInputStream partInfoReader = new ObjectInputStream(new InflaterInputStream(ois)); partitions = (List<PartInfo>)partInfoReader.readObject(); - for (PartInfo partInfo : partitions) { - if (partInfo.getTableInfo() == null) { - partInfo.setTableInfo(this.tableInfo); + if (partitions != null) { + for (PartInfo partInfo : partitions) { + if (partInfo.getTableInfo() == null) { + partInfo.setTableInfo(this.tableInfo); + } } } } http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 806dbdb..1723ff1 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -451,6 +451,8 @@ public class TestStreaming { JobConf job = new JobConf(); job.set("mapred.input.dir", partitionPath.toString()); job.set("bucket_count", Integer.toString(buckets)); + job.set("columns", "id,msg"); + job.set("columns.types", "bigint:string"); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); InputSplit[] splits = inf.getSplits(job, buckets); Assert.assertEquals(buckets, splits.length); http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java index 477ed8c..339e9ef 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java @@ -128,6 +128,8 @@ public class StreamingAssert { JobConf job = new JobConf(); job.set("mapred.input.dir", partitionLocation.toString()); job.set("bucket_count", Integer.toString(table.getSd().getNumBuckets())); + job.set("columns", "id,msg"); + job.set("columns.types", "bigint:string"); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); InputSplit[] splits = inputFormat.getSplits(job, 1); assertEquals(1, splits.length); http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index e2910dd..dabe434 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -128,7 +128,194 @@ public class TestCompactor { driver.close(); } } - + + /** + * Simple schema evolution add columns with partitioning. + * @throws Exception + */ + @Test + public void schemaEvolutionAddColDynamicPartitioningInsert() throws Exception { + String tblName = "dpct"; + List<String> colNames = Arrays.asList("a", "b"); + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " PARTITIONED BY(ds string)" + + " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + + // First INSERT round. + executeStatementOnDriver("insert into " + tblName + " partition (ds) values (1, 'fred', " + + "'today'), (2, 'wilma', 'yesterday')", driver); + + // ALTER TABLE ... ADD COLUMNS + executeStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(c int)", driver); + + // Validate there is an added NULL for column c. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(2, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1)); + + // Second INSERT round with new inserts into previously existing partition 'yesterday'. + executeStatementOnDriver("insert into " + tblName + " partition (ds) values " + + "(3, 'mark', 1900, 'soon'), (4, 'douglas', 1901, 'last_century'), " + + "(5, 'doc', 1902, 'yesterday')", + driver); + + // Validate there the new insertions for column c. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(5, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1)); + Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2)); + Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3)); + Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4)); + + Initiator initiator = new Initiator(); + initiator.setThreadId((int)initiator.getId()); + conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0); + initiator.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(); + stop.set(true); + initiator.init(stop, new AtomicBoolean()); + initiator.run(); + + CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(4, compacts.size()); + SortedSet<String> partNames = new TreeSet<String>(); + for (int i = 0; i < compacts.size(); i++) { + Assert.assertEquals("default", compacts.get(i).getDbname()); + Assert.assertEquals(tblName, compacts.get(i).getTablename()); + Assert.assertEquals("initiated", compacts.get(i).getState()); + partNames.add(compacts.get(i).getPartitionname()); + } + List<String> names = new ArrayList<String>(partNames); + Assert.assertEquals("ds=last_century", names.get(0)); + Assert.assertEquals("ds=soon", names.get(1)); + Assert.assertEquals("ds=today", names.get(2)); + Assert.assertEquals("ds=yesterday", names.get(3)); + + // Validate after compaction. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(5, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1)); + Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2)); + Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3)); + Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4)); + + } + + @Test + public void schemaEvolutionAddColDynamicPartitioningUpdate() throws Exception { + String tblName = "udpct"; + List<String> colNames = Arrays.asList("a", "b"); + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " PARTITIONED BY(ds string)" + + " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + executeStatementOnDriver("insert into " + tblName + " partition (ds) values (1, 'fred', " + + "'today'), (2, 'wilma', 'yesterday')", driver); + + executeStatementOnDriver("update " + tblName + " set b = 'barney'", driver); + + // Validate the update. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(2, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tbarney\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\tbarney\tyesterday", valuesReadFromHiveDriver.get(1)); + + // ALTER TABLE ... ADD COLUMNS + executeStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(c int)", driver); + + // Validate there is an added NULL for column c. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(2, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tbarney\tNULL\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\tbarney\tNULL\tyesterday", valuesReadFromHiveDriver.get(1)); + + // Second INSERT round with new inserts into previously existing partition 'yesterday'. + executeStatementOnDriver("insert into " + tblName + " partition (ds) values " + + "(3, 'mark', 1900, 'soon'), (4, 'douglas', 1901, 'last_century'), " + + "(5, 'doc', 1902, 'yesterday')", + driver); + + // Validate there the new insertions for column c. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(5, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tbarney\tNULL\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\tbarney\tNULL\tyesterday", valuesReadFromHiveDriver.get(1)); + Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2)); + Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3)); + Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4)); + + executeStatementOnDriver("update " + tblName + " set c = 2000", driver); + + // Validate the update of new column c, even in old rows. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(5, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tbarney\t2000\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\tbarney\t2000\tyesterday", valuesReadFromHiveDriver.get(1)); + Assert.assertEquals("3\tmark\t2000\tsoon", valuesReadFromHiveDriver.get(2)); + Assert.assertEquals("4\tdouglas\t2000\tlast_century", valuesReadFromHiveDriver.get(3)); + Assert.assertEquals("5\tdoc\t2000\tyesterday", valuesReadFromHiveDriver.get(4)); + + Initiator initiator = new Initiator(); + initiator.setThreadId((int)initiator.getId()); + // Set to 1 so insert doesn't set it off but update does + conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 1); + initiator.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(); + stop.set(true); + initiator.init(stop, new AtomicBoolean()); + initiator.run(); + + CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(4, compacts.size()); + SortedSet<String> partNames = new TreeSet<String>(); + for (int i = 0; i < compacts.size(); i++) { + Assert.assertEquals("default", compacts.get(i).getDbname()); + Assert.assertEquals(tblName, compacts.get(i).getTablename()); + Assert.assertEquals("initiated", compacts.get(i).getState()); + partNames.add(compacts.get(i).getPartitionname()); + } + List<String> names = new ArrayList<String>(partNames); + Assert.assertEquals("ds=last_century", names.get(0)); + Assert.assertEquals("ds=soon", names.get(1)); + Assert.assertEquals("ds=today", names.get(2)); + Assert.assertEquals("ds=yesterday", names.get(3)); + + // Validate after compaction. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(5, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tbarney\t2000\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\tbarney\t2000\tyesterday", valuesReadFromHiveDriver.get(1)); + Assert.assertEquals("3\tmark\t2000\tsoon", valuesReadFromHiveDriver.get(2)); + Assert.assertEquals("4\tdouglas\t2000\tlast_century", valuesReadFromHiveDriver.get(3)); + Assert.assertEquals("5\tdoc\t2000\tyesterday", valuesReadFromHiveDriver.get(4)); + } + /** * After each major compaction, stats need to be updated on each column of the * table/partition which previously had stats. @@ -255,7 +442,9 @@ public class TestCompactor { t.run(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); + if (1 != compacts.size()) { + Assert.fail("Expecting 1 file and found " + compacts.size() + " files " + compacts.toString()); + } Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); stats = msClient.getPartitionColumnStatistics(ci.dbname, ci.tableName, @@ -409,6 +598,8 @@ public class TestCompactor { String dbName = "default"; String tblName = "cws"; List<String> colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; executeStatementOnDriver("drop table if exists " + tblName, driver); executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed @@ -452,9 +643,12 @@ public class TestCompactor { } } Arrays.sort(names); - Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002", - "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"}); - checkExpectedTxnsPresent(null, new Path[]{resultFile}, 0, 1L, 4L); + String[] expected = new String[]{"delta_0000001_0000002", + "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"}; + if (!Arrays.deepEquals(expected, names)) { + Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); + } + checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 1L, 4L); } finally { connection.close(); @@ -466,6 +660,8 @@ public class TestCompactor { String dbName = "default"; String tblName = "cws"; List<String> colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; executeStatementOnDriver("drop table if exists " + tblName, driver); executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed @@ -500,10 +696,12 @@ public class TestCompactor { FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter); - Assert.assertEquals(1, stat.length); + if (1 != stat.length) { + Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); + } String name = stat[0].getPath().getName(); Assert.assertEquals(name, "base_0000004"); - checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L); + checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L); } finally { connection.close(); } @@ -514,6 +712,8 @@ public class TestCompactor { String dbName = "default"; String tblName = "cws"; List<String> colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; executeStatementOnDriver("drop table if exists " + tblName, driver); executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed @@ -561,9 +761,12 @@ public class TestCompactor { } } Arrays.sort(names); - Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002", - "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"}); - checkExpectedTxnsPresent(null, new Path[]{resultDelta}, 0, 1L, 4L); + String[] expected = new String[]{"delta_0000001_0000002", + "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"}; + if (!Arrays.deepEquals(expected, names)) { + Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); + } + checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L); } finally { connection.close(); } @@ -574,6 +777,8 @@ public class TestCompactor { String dbName = "default"; String tblName = "cws"; List<String> colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; executeStatementOnDriver("drop table if exists " + tblName, driver); executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed @@ -613,10 +818,17 @@ public class TestCompactor { FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter); - Assert.assertEquals(1, stat.length); + if (1 != stat.length) { + Assert.fail("majorCompactAfterAbort FileStatus[] stat " + Arrays.toString(stat)); + } + if (1 != stat.length) { + Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat)); + } String name = stat[0].getPath().getName(); - Assert.assertEquals(name, "base_0000006"); - checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L); + if (!name.equals("base_0000006")) { + Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006"); + } + checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L); } finally { connection.close(); } @@ -642,7 +854,8 @@ public class TestCompactor { } } - private void checkExpectedTxnsPresent(Path base, Path[] deltas, int bucket, long min, long max) + private void checkExpectedTxnsPresent(Path base, Path[] deltas, String columnNamesProperty, + String columnTypesProperty, int bucket, long min, long max) throws IOException { ValidTxnList txnList = new ValidTxnList() { @Override @@ -678,8 +891,11 @@ public class TestCompactor { OrcInputFormat aif = new OrcInputFormat(); + Configuration conf = new Configuration(); + conf.set("columns", columnNamesProperty); + conf.set("columns.types", columnTypesProperty); AcidInputFormat.RawReader<OrcStruct> reader = - aif.getRawReader(new Configuration(), false, bucket, txnList, base, deltas); + aif.getRawReader(conf, false, bucket, txnList, base, deltas); RecordIdentifier identifier = reader.createKey(); OrcStruct value = reader.createValue(); long currentTxn = min; http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 402914c..ece43cc 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -162,6 +162,22 @@ minitez.query.files.shared=alter_merge_2_orc.q,\ ptf_matchpath.q,\ ptf_streaming.q,\ sample1.q,\ + schema_evol_text_nonvec_mapwork_table.q,\ + schema_evol_text_nonvec_fetchwork_table.q,\ + schema_evol_orc_nonvec_fetchwork_part.q,\ + schema_evol_orc_nonvec_mapwork_part.q,\ + schema_evol_text_nonvec_fetchwork_part.q,\ + schema_evol_text_nonvec_mapwork_part.q,\ + schema_evol_orc_acid_mapwork_part.q,\ + schema_evol_orc_acid_mapwork_table.q,\ + schema_evol_orc_acidvec_mapwork_table.q,\ + schema_evol_orc_acidvec_mapwork_part.q,\ + schema_evol_orc_vec_mapwork_part.q,\ + schema_evol_text_fetchwork_table.q,\ + schema_evol_text_mapwork_table.q,\ + schema_evol_orc_vec_mapwork_table.q,\ + schema_evol_orc_nonvec_mapwork_table.q,\ + schema_evol_orc_nonvec_fetchwork_table.q,\ selectDistinctStar.q,\ script_env_var1.q,\ script_env_var2.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index 51f4c8e..b57366c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@ -33,11 +33,13 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.encoded.Consumer; import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileSplit; @@ -54,7 +56,8 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; public class LlapInputFormat - implements InputFormat<NullWritable, VectorizedRowBatch>, VectorizedInputFormatInterface { + implements InputFormat<NullWritable, VectorizedRowBatch>, VectorizedInputFormatInterface, + SelfDescribingInputFormatInterface { @SuppressWarnings("rawtypes") private final InputFormat sourceInputFormat; private final ColumnVectorProducer cvp; @@ -104,6 +107,8 @@ public class LlapInputFormat private final SearchArgument sarg; private final String[] columnNames; private final VectorizedRowBatchCtx rbCtx; + private final boolean[] columnsToIncludeTruncated; + private final Object[] partitionValues; private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>(); private ColumnVectorBatch lastCvb = null; @@ -118,19 +123,28 @@ public class LlapInputFormat private long firstReturnTime; public LlapRecordReader( - JobConf job, FileSplit split, List<Integer> includedCols, String hostName) { + JobConf job, FileSplit split, List<Integer> includedCols, String hostName) + throws IOException { this.split = split; this.columnIds = includedCols; this.sarg = ConvertAstToSearchArg.createFromConf(job); this.columnNames = ColumnProjectionUtils.getReadColumnNames(job); this.counters = new QueryFragmentCounters(job); this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName); - try { - rbCtx = new VectorizedRowBatchCtx(); - rbCtx.init(job, split); - } catch (Exception e) { - throw new RuntimeException(e); + + MapWork mapWork = Utilities.getMapWork(job); + rbCtx = mapWork.getVectorizedRowBatchCtx(); + + columnsToIncludeTruncated = rbCtx.getColumnsToIncludeTruncated(job); + + int partitionColumnCount = rbCtx.getPartitionColumnCount(); + if (partitionColumnCount > 0) { + partitionValues = new Object[partitionColumnCount]; + rbCtx.getPartitionValues(rbCtx, job, split, partitionValues); + } else { + partitionValues = null; } + startRead(); } @@ -143,10 +157,8 @@ public class LlapInputFormat // Add partition cols if necessary (see VectorizedOrcInputFormat for details). boolean wasFirst = isFirst; if (isFirst) { - try { - rbCtx.addPartitionColsToBatch(value); - } catch (HiveException e) { - throw new IOException(e); + if (partitionValues != null) { + rbCtx.addPartitionColsToBatch(value, partitionValues); } isFirst = false; } @@ -244,11 +256,7 @@ public class LlapInputFormat @Override public VectorizedRowBatch createValue() { - try { - return rbCtx.createVectorizedRowBatch(); - } catch (HiveException e) { - throw new RuntimeException("Error creating a batch", e); - } + return rbCtx.createVectorizedRowBatch(columnsToIncludeTruncated); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 39a881a..892587a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -484,8 +484,13 @@ public enum ErrorMsg { INVALID_FILE_FORMAT_IN_LOAD(30019, "The file that you are trying to load does not match the" + - " file format of the destination table.") + " file format of the destination table."), + SCHEMA_REQUIRED_TO_READ_ACID_TABLES(30020, "Neither the configuration variables " + + "schema.evolution.columns / schema.evolution.columns.types " + + "nor the " + + "columns / columns.types " + + "are set. Table schema information is required to read ACID tables") ; private int errorCode; http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 157115b..ad36093 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -132,6 +132,10 @@ public class FetchOperator implements Serializable { this.job = job; this.work = work; this.operator = operator; + if (operator instanceof TableScanOperator) { + Utilities.addTableSchemaToConf(job, + (TableScanOperator) operator); + } this.vcCols = vcCols; this.hasVC = vcCols != null && !vcCols.isEmpty(); this.isStatReader = work.getTblDesc() == null; @@ -599,6 +603,9 @@ public class FetchOperator implements Serializable { } private boolean needConversion(PartitionDesc partitionDesc) { + if (Utilities.isInputFileFormatSelfDescribing(partitionDesc)) { + return false; + } return needConversion(partitionDesc.getTableDesc(), Arrays.asList(partitionDesc)); } http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index c2a5726..99724c1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.MapOperator.MapOpCtx; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -63,6 +64,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; @@ -202,8 +204,13 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon opCtx.partName = String.valueOf(partSpec); opCtx.deserializer = pd.getDeserializer(hconf); - StructObjectInspector partRawRowObjectInspector = - (StructObjectInspector) opCtx.deserializer.getObjectInspector(); + StructObjectInspector partRawRowObjectInspector; + if (Utilities.isInputFileFormatSelfDescribing(pd)) { + partRawRowObjectInspector = tableRowOI; + } else { + partRawRowObjectInspector = + (StructObjectInspector) opCtx.deserializer.getObjectInspector(); + } opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(partRawRowObjectInspector, tableRowOI); @@ -304,8 +311,15 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile); TableDesc tableDesc = pd.getTableDesc(); Deserializer partDeserializer = pd.getDeserializer(hconf); - StructObjectInspector partRawRowObjectInspector = - (StructObjectInspector) partDeserializer.getObjectInspector(); + + StructObjectInspector partRawRowObjectInspector; + if (Utilities.isInputFileFormatSelfDescribing(pd)) { + Deserializer tblDeserializer = tableDesc.getDeserializer(hconf); + partRawRowObjectInspector = (StructObjectInspector) tblDeserializer.getObjectInspector(); + } else { + partRawRowObjectInspector = + (StructObjectInspector) partDeserializer.getObjectInspector(); + } StructObjectInspector tblRawRowObjectInspector = tableDescOI.get(tableDesc); if ((tblRawRowObjectInspector == null) || http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index 6e4f474..90c83e6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -67,6 +67,13 @@ public class TableScanOperator extends Operator<TableScanDesc> implements private String defaultPartitionName; + /** + * These values are saved during MapWork, FetchWork, etc preparation and later added to the the + * JobConf of each task. + */ + private String schemaEvolutionColumns; + private String schemaEvolutionColumnsTypes; + public TableDesc getTableDesc() { return tableDesc; } @@ -75,6 +82,19 @@ public class TableScanOperator extends Operator<TableScanDesc> implements this.tableDesc = tableDesc; } + public void setSchemaEvolution(String schemaEvolutionColumns, String schemaEvolutionColumnsTypes) { + this.schemaEvolutionColumns = schemaEvolutionColumns; + this.schemaEvolutionColumnsTypes = schemaEvolutionColumnsTypes; + } + + public String getSchemaEvolutionColumns() { + return schemaEvolutionColumns; + } + + public String getSchemaEvolutionColumnsTypes() { + return schemaEvolutionColumnsTypes; + } + /** * Other than gathering statistics for the ANALYZE command, the table scan operator * does not do anything special other than just forwarding the row. Since the table http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index de2eb98..0700e2f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -106,6 +106,7 @@ import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; @@ -121,6 +122,8 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; @@ -128,11 +131,14 @@ import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; +import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper; import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper; @@ -173,6 +179,12 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.Serializer; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; @@ -473,11 +485,6 @@ public final class Utilities { } } - public static Map<Integer, String> getMapWorkVectorScratchColumnTypeMap(Configuration hiveConf) { - MapWork mapWork = getMapWork(hiveConf); - return mapWork.getVectorScratchColumnTypeMap(); - } - public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) { try { Graph stageGraph = plan.getQueryPlan().getStageGraph(); @@ -3782,6 +3789,22 @@ public final class Utilities { return false; } + /** + * @param conf + * @return the configured VectorizedRowBatchCtx for a MapWork task. + */ + public static VectorizedRowBatchCtx getVectorizedRowBatchCtx(Configuration conf) { + VectorizedRowBatchCtx result = null; + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) && + Utilities.getPlanPath(conf) != null) { + MapWork mapWork = Utilities.getMapWork(conf); + if (mapWork != null && mapWork.getVectorMode()) { + result = mapWork.getVectorizedRowBatchCtx(); + } + } + return result; + } + public static boolean isVectorMode(Configuration conf, MapWork mapWork) { return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) && mapWork.getVectorMode(); @@ -3993,6 +4016,7 @@ public final class Utilities { } } + public static List<String> getStatsTmpDirs(BaseWork work, Configuration conf) { List<String> statsTmpDirs = new ArrayList<>(); @@ -4020,4 +4044,74 @@ public final class Utilities { } return statsTmpDirs; } + + public static boolean isInputFileFormatSelfDescribing(PartitionDesc pd) { + Class<?> inputFormatClass = pd.getInputFileFormatClass(); + return SelfDescribingInputFormatInterface.class.isAssignableFrom(inputFormatClass); + } + + public static boolean isInputFileFormatVectorized(PartitionDesc pd) { + Class<?> inputFormatClass = pd.getInputFileFormatClass(); + return VectorizedInputFormatInterface.class.isAssignableFrom(inputFormatClass); + } + + public static void addSchemaEvolutionToTableScanOperator(Table table, + TableScanOperator tableScanOp) { + String colNames = MetaStoreUtils.getColumnNamesFromFieldSchema(table.getSd().getCols()); + String colTypes = MetaStoreUtils.getColumnTypesFromFieldSchema(table.getSd().getCols()); + tableScanOp.setSchemaEvolution(colNames, colTypes); + } + + public static void addSchemaEvolutionToTableScanOperator(StructObjectInspector structOI, + TableScanOperator tableScanOp) { + String colNames = ObjectInspectorUtils.getFieldNames(structOI); + String colTypes = ObjectInspectorUtils.getFieldTypes(structOI); + tableScanOp.setSchemaEvolution(colNames, colTypes); + } + + public static void unsetSchemaEvolution(Configuration conf) { + conf.unset(IOConstants.SCHEMA_EVOLUTION_COLUMNS); + conf.unset(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES); + } + + public static void addTableSchemaToConf(Configuration conf, + TableScanOperator tableScanOp) { + String schemaEvolutionColumns = tableScanOp.getSchemaEvolutionColumns(); + if (schemaEvolutionColumns != null) { + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, tableScanOp.getSchemaEvolutionColumns()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, tableScanOp.getSchemaEvolutionColumnsTypes()); + } else { + LOG.info("schema.evolution.columns and schema.evolution.columns.types not available"); + } + } + + /** + * Create row key and value object inspectors for reduce vectorization. + * The row object inspector used by ReduceWork needs to be a **standard** + * struct object inspector, not just any struct object inspector. + * @param keyInspector + * @param valueInspector + * @return OI + * @throws HiveException + */ + public static StandardStructObjectInspector constructVectorizedReduceRowOI( + StructObjectInspector keyInspector, StructObjectInspector valueInspector) + throws HiveException { + + ArrayList<String> colNames = new ArrayList<String>(); + ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(); + List<? extends StructField> fields = keyInspector.getAllStructFieldRefs(); + for (StructField field: fields) { + colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); + ois.add(field.getFieldObjectInspector()); + } + fields = valueInspector.getAllStructFieldRefs(); + for (StructField field: fields) { + colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); + ois.add(field.getFieldObjectInspector()); + } + StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois); + + return rowObjectInspector; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 5fbefec..439e0df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; @@ -153,10 +154,6 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { /* vectorization only works with struct object inspectors */ valueStructInspectors[tag] = (StructObjectInspector) valueObjectInspector[tag]; - ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair = VectorizedBatchUtil. - constructVectorizedRowBatch(keyStructInspector, - valueStructInspectors[tag], gWork.getVectorScratchColumnTypeMap()); - batches[tag] = pair.getFirst(); final int totalColumns = keysColumnOffset + valueStructInspectors[tag].getAllStructFieldRefs().size(); valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns); @@ -165,7 +162,11 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory .genVectorStructExpressionWritables(valueStructInspectors[tag]))); - rowObjectInspector[tag] = pair.getSecond(); + rowObjectInspector[tag] = Utilities.constructVectorizedReduceRowOI(keyStructInspector, + valueStructInspectors[tag]); + batches[tag] = gWork.getVectorizedRowBatchCtx().createVectorizedRowBatch(); + + } else { ois.add(keyObjectInspector); ois.add(valueObjectInspector[tag]); http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 8768847..efcf88c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -244,7 +244,7 @@ public class ReduceRecordProcessor extends RecordProcessor{ boolean vectorizedRecordSource = (tag == bigTablePosition) && redWork.getVectorMode(); sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc, valueTableDesc, reader, tag == bigTablePosition, (byte) tag, - redWork.getVectorScratchColumnTypeMap()); + redWork.getVectorizedRowBatchCtx()); ois[tag] = sources[tag].getObjectInspector(); } http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index aff5765..1f75d07 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -22,11 +22,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; -import java.util.Map; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.hive.common.ObjectPair; import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -35,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -52,7 +50,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; @@ -124,7 +121,7 @@ public class ReduceRecordSource implements RecordSource { void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc, TableDesc valueTableDesc, Reader reader, boolean handleGroupKey, byte tag, - Map<Integer, String> vectorScratchColumnTypeMap) + VectorizedRowBatchCtx batchContext) throws Exception { ObjectInspector keyObjectInspector; @@ -175,10 +172,9 @@ public class ReduceRecordSource implements RecordSource { .asList(VectorExpressionWriterFactory .genVectorStructExpressionWritables(valueStructInspectors))); - ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair = - VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, valueStructInspectors, vectorScratchColumnTypeMap); - rowObjectInspector = pair.getSecond(); - batch = pair.getFirst(); + rowObjectInspector = Utilities.constructVectorizedReduceRowOI(keyStructInspector, + valueStructInspectors); + batch = batchContext.createVectorizedRowBatch(); // Setup vectorized deserialization for the key and value. BinarySortableSerDe binarySortableSerDe = (BinarySortableSerDe) inputKeyDeserializer; @@ -186,7 +182,7 @@ public class ReduceRecordSource implements RecordSource { keyBinarySortableDeserializeToRow = new VectorDeserializeRow<BinarySortableDeserializeRead>( new BinarySortableDeserializeRead( - VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector( + VectorizedBatchUtil.typeInfosFromStructObjectInspector( keyStructInspector), binarySortableSerDe.getSortOrders())); keyBinarySortableDeserializeToRow.init(0); @@ -196,7 +192,7 @@ public class ReduceRecordSource implements RecordSource { valueLazyBinaryDeserializeToRow = new VectorDeserializeRow<LazyBinaryDeserializeRead>( new LazyBinaryDeserializeRead( - VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector( + VectorizedBatchUtil.typeInfosFromStructObjectInspector( valueStructInspectors))); valueLazyBinaryDeserializeToRow.init(firstValueColumnOffset); http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java index 94a60be..4100bc5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorExtractRow.java @@ -468,6 +468,9 @@ public abstract class VectorExtractRow { int start = colVector.start[adjustedIndex]; int length = colVector.length[adjustedIndex]; + if (value == null) { + LOG.info("null string entry: batchIndex " + batchIndex + " columnIndex " + columnIndex); + } // Use org.apache.hadoop.io.Text as our helper to go from byte[] to String. text.set(value, start, length); @@ -727,9 +730,9 @@ public abstract class VectorExtractRow { } public void extractRow(int batchIndex, Object[] objects) { - int i = 0; - for (Extractor extracter : extracters) { - objects[i++] = extracter.extract(batchIndex); + for (int i = 0; i < extracters.length; i++) { + Extractor extracter = extracters[i]; + objects[i] = extracter.extract(batchIndex); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java index 35bbaef..0524c08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java @@ -813,7 +813,7 @@ public class VectorGroupByOperator extends Operator<GroupByDesc> implements outputFieldNames, objectInspectors); if (isVectorOutput) { vrbCtx = new VectorizedRowBatchCtx(); - vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) outputObjInspector); + vrbCtx.init((StructObjectInspector) outputObjInspector, vOutContext.getScratchColumnTypeNames()); outputBatch = vrbCtx.createVectorizedRowBatch(); vectorAssignRowSameBatch = new VectorAssignRowSameBatch(); vectorAssignRowSameBatch.init((StructObjectInspector) outputObjInspector, vOutContext.getProjectedColumns()); http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java index e378d0d..4b1d9ad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinBaseOperator.java @@ -90,7 +90,7 @@ public class VectorMapJoinBaseOperator extends MapJoinOperator implements Vector super.initializeOp(hconf); vrbCtx = new VectorizedRowBatchCtx(); - vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector); + vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames()); outputBatch = vrbCtx.createVectorizedRowBatch(); http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java index dcd2d57..9ff9b77 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java @@ -146,7 +146,7 @@ public class VectorSMBMapJoinOperator extends SMBMapJoinOperator implements Vect super.initializeOp(hconf); vrbCtx = new VectorizedRowBatchCtx(); - vrbCtx.init(vOutContext.getScratchColumnTypeMap(), (StructObjectInspector) this.outputObjInspector); + vrbCtx.init((StructObjectInspector) this.outputObjInspector, vOutContext.getScratchColumnTypeNames()); outputBatch = vrbCtx.createVectorizedRowBatch(); http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java index e7a829e..95a4b9d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java @@ -144,6 +144,8 @@ public class VectorizationContext { VectorExpressionDescriptor vMap; + private List<String> initialColumnNames; + private List<Integer> projectedColumns; private List<String> projectionColumnNames; private Map<String, Integer> projectionColumnMap; @@ -161,6 +163,7 @@ public class VectorizationContext { LOG.debug("VectorizationContext consructor contextName " + contextName + " level " + level + " initialColumnNames " + initialColumnNames); } + this.initialColumnNames = initialColumnNames; this.projectionColumnNames = initialColumnNames; projectedColumns = new ArrayList<Integer>(); @@ -183,6 +186,7 @@ public class VectorizationContext { if (LOG.isDebugEnabled()) { LOG.debug("VectorizationContext consructor contextName " + contextName + " level " + level); } + initialColumnNames = new ArrayList<String>(); projectedColumns = new ArrayList<Integer>(); projectionColumnNames = new ArrayList<String>(); projectionColumnMap = new HashMap<String, Integer>(); @@ -198,6 +202,7 @@ public class VectorizationContext { this.contextName = contextName; level = vContext.level + 1; LOG.info("VectorizationContext consructor reference contextName " + contextName + " level " + level); + this.initialColumnNames = vContext.initialColumnNames; this.projectedColumns = new ArrayList<Integer>(); this.projectionColumnNames = new ArrayList<String>(); this.projectionColumnMap = new HashMap<String, Integer>(); @@ -210,6 +215,7 @@ public class VectorizationContext { // Add an initial column to a vectorization context when // a vectorized row batch is being created. public void addInitialColumn(String columnName) { + initialColumnNames.add(columnName); int index = projectedColumns.size(); projectedColumns.add(index); projectionColumnNames.add(columnName); @@ -238,6 +244,10 @@ public class VectorizationContext { projectionColumnMap.put(columnName, vectorBatchColIndex); } + public List<String> getInitialColumnNames() { + return initialColumnNames; + } + public List<Integer> getProjectedColumns() { return projectedColumns; } @@ -1038,7 +1048,9 @@ public class VectorizationContext { VectorExpressionDescriptor.Descriptor descriptor = builder.build(); Class<?> vclass = this.vMap.getVectorExpressionClass(udfClass, descriptor); if (vclass == null) { - LOG.info("No vector udf found for " + udfClass.getSimpleName() + ", descriptor: " + descriptor); + if (LOG.isDebugEnabled()) { + LOG.debug("No vector udf found for "+udfClass.getSimpleName() + ", descriptor: "+descriptor); + } return null; } Mode childrenMode = getChildrenMode(mode, udfClass); @@ -2337,11 +2349,11 @@ public class VectorizationContext { return ColumnVector.Type.DECIMAL; default: - throw new HiveException("Unexpected primitive type category " + primitiveCategory); + throw new RuntimeException("Unexpected primitive type category " + primitiveCategory); } } default: - throw new HiveException("Unexpected type category " + + throw new RuntimeException("Unexpected type category " + typeInfo.getCategory()); } } @@ -2452,13 +2464,16 @@ public class VectorizationContext { return firstOutputColumnIndex; } - public Map<Integer, String> getScratchColumnTypeMap() { - Map<Integer, String> map = new HashMap<Integer, String>(); + public String[] getScratchColumnTypeNames() { + String[] result = new String[ocm.outputColCount]; for (int i = 0; i < ocm.outputColCount; i++) { - String type = ocm.outputColumnsTypes[i]; - map.put(i+this.firstOutputColumnIndex, type); + String typeName = ocm.outputColumnsTypes[i]; + if (typeName.equalsIgnoreCase("long")) { + typeName = "bigint"; // Convert our synonym to a real Hive type name. + } + result[i] = typeName; } - return map; + return result; } @Override @@ -2478,9 +2493,7 @@ public class VectorizationContext { } sb.append("sorted projectionColumnMap ").append(sortedColumnMap).append(", "); - Map<Integer, String> sortedScratchColumnTypeMap = new TreeMap<Integer, String>(comparerInteger); - sortedScratchColumnTypeMap.putAll(getScratchColumnTypeMap()); - sb.append("sorted scratchColumnTypeMap ").append(sortedScratchColumnTypeMap); + sb.append("scratchColumnTypeNames ").append(getScratchColumnTypeNames().toString()); return sb.toString(); } http://git-wip-us.apache.org/repos/asf/hive/blob/249c4ef1/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java index 3d6d6e0..d75d185 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java @@ -56,9 +56,13 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; @@ -114,16 +118,24 @@ public class VectorizedBatchUtil { batch.size = size; } - /** - * Convert an ObjectInspector into a ColumnVector of the appropriate - * type. - */ - public static ColumnVector createColumnVector(ObjectInspector inspector - ) throws HiveException { - switch(inspector.getCategory()) { - case PRIMITIVE: - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) inspector; - switch(poi.getPrimitiveCategory()) { + public static ColumnVector createColumnVector(String typeName) { + typeName = typeName.toLowerCase(); + + // Allow undecorated CHAR and VARCHAR to support scratch column type names. + if (typeName.equals("char") || typeName.equals("varchar")) { + return new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); + } + + TypeInfo typeInfo = (TypeInfo) TypeInfoUtils.getTypeInfoFromTypeString(typeName); + return createColumnVector(typeInfo); + } + + public static ColumnVector createColumnVector(TypeInfo typeInfo) { + switch(typeInfo.getCategory()) { + case PRIMITIVE: + { + PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo; + switch(primitiveTypeInfo.getPrimitiveCategory()) { case BOOLEAN: case BYTE: case SHORT: @@ -143,142 +155,53 @@ public class VectorizedBatchUtil { case VARCHAR: return new BytesColumnVector(VectorizedRowBatch.DEFAULT_SIZE); case DECIMAL: - DecimalTypeInfo tInfo = (DecimalTypeInfo) poi.getTypeInfo(); + DecimalTypeInfo tInfo = (DecimalTypeInfo) primitiveTypeInfo; return new DecimalColumnVector(VectorizedRowBatch.DEFAULT_SIZE, tInfo.precision(), tInfo.scale()); default: - throw new HiveException("Vectorizaton is not supported for datatype:" - + poi.getPrimitiveCategory()); + throw new RuntimeException("Vectorizaton is not supported for datatype:" + + primitiveTypeInfo.getPrimitiveCategory()); } - case STRUCT: { - StructObjectInspector soi = (StructObjectInspector) inspector; - List<? extends StructField> fieldList = soi.getAllStructFieldRefs(); - ColumnVector[] children = new ColumnVector[fieldList.size()]; + } + case STRUCT: + { + StructTypeInfo structTypeInfo = (StructTypeInfo) typeInfo; + List<TypeInfo> typeInfoList = structTypeInfo.getAllStructFieldTypeInfos(); + ColumnVector[] children = new ColumnVector[typeInfoList.size()]; for(int i=0; i < children.length; ++i) { children[i] = - createColumnVector(fieldList.get(i).getFieldObjectInspector()); + createColumnVector(typeInfoList.get(i)); } return new StructColumnVector(VectorizedRowBatch.DEFAULT_SIZE, children); } - case UNION: { - UnionObjectInspector uoi = (UnionObjectInspector) inspector; - List<ObjectInspector> fieldList = uoi.getObjectInspectors(); - ColumnVector[] children = new ColumnVector[fieldList.size()]; + case UNION: + { + UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo; + List<TypeInfo> typeInfoList = unionTypeInfo.getAllUnionObjectTypeInfos(); + ColumnVector[] children = new ColumnVector[typeInfoList.size()]; for(int i=0; i < children.length; ++i) { - children[i] = createColumnVector(fieldList.get(i)); + children[i] = createColumnVector(typeInfoList.get(i)); } return new UnionColumnVector(VectorizedRowBatch.DEFAULT_SIZE, children); } - case LIST: { - ListObjectInspector loi = (ListObjectInspector) inspector; + case LIST: + { + ListTypeInfo listTypeInfo = (ListTypeInfo) typeInfo; return new ListColumnVector(VectorizedRowBatch.DEFAULT_SIZE, - createColumnVector(loi.getListElementObjectInspector())); + createColumnVector(listTypeInfo.getListElementTypeInfo())); } - case MAP: { - MapObjectInspector moi = (MapObjectInspector) inspector; + case MAP: + { + MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; return new MapColumnVector(VectorizedRowBatch.DEFAULT_SIZE, - createColumnVector(moi.getMapKeyObjectInspector()), - createColumnVector(moi.getMapValueObjectInspector())); + createColumnVector(mapTypeInfo.getMapKeyTypeInfo()), + createColumnVector(mapTypeInfo.getMapValueTypeInfo())); } - default: - throw new HiveException("Vectorization is not supported for datatype:" - + inspector.getCategory()); - } - - } - - /** - * Walk through the object inspector and add column vectors - * - * @param oi - * @param cvList - * ColumnVectors are populated in this list - */ - private static void allocateColumnVector(StructObjectInspector oi, - List<ColumnVector> cvList) throws HiveException { - if (cvList == null) { - throw new HiveException("Null columnvector list"); - } - if (oi == null) { - return; - } - final List<? extends StructField> fields = oi.getAllStructFieldRefs(); - for(StructField field : fields) { - ObjectInspector fieldObjectInspector = field.getFieldObjectInspector(); - cvList.add(createColumnVector(fieldObjectInspector)); - } - } - - - /** - * Create VectorizedRowBatch from ObjectInspector - * - * @param oi - * @return - * @throws HiveException - */ - public static VectorizedRowBatch constructVectorizedRowBatch( - StructObjectInspector oi) throws HiveException { - final List<ColumnVector> cvList = new LinkedList<ColumnVector>(); - allocateColumnVector(oi, cvList); - final VectorizedRowBatch result = new VectorizedRowBatch(cvList.size()); - int i = 0; - for(ColumnVector cv : cvList) { - result.cols[i++] = cv; - } - return result; - } - - /** - * Create VectorizedRowBatch from key and value object inspectors - * The row object inspector used by ReduceWork needs to be a **standard** - * struct object inspector, not just any struct object inspector. - * @param keyInspector - * @param valueInspector - * @param vectorScratchColumnTypeMap - * @return VectorizedRowBatch, OI - * @throws HiveException - */ - public static ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> constructVectorizedRowBatch( - StructObjectInspector keyInspector, StructObjectInspector valueInspector, Map<Integer, String> vectorScratchColumnTypeMap) - throws HiveException { - - ArrayList<String> colNames = new ArrayList<String>(); - ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(); - List<? extends StructField> fields = keyInspector.getAllStructFieldRefs(); - for (StructField field: fields) { - colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); - } - fields = valueInspector.getAllStructFieldRefs(); - for (StructField field: fields) { - colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); - ois.add(field.getFieldObjectInspector()); + default: + throw new RuntimeException("Vectorization is not supported for datatype:" + + typeInfo.getCategory()); } - StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois); - - VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx(); - batchContext.init(vectorScratchColumnTypeMap, rowObjectInspector); - return new ObjectPair<>(batchContext.createVectorizedRowBatch(), rowObjectInspector); - } - - /** - * Iterates through all columns in a given row and populates the batch - * - * @param row - * @param oi - * @param rowIndex - * @param batch - * @param buffer - * @throws HiveException - */ - public static void addRowToBatch(Object row, StructObjectInspector oi, - int rowIndex, - VectorizedRowBatch batch, - DataOutputBuffer buffer - ) throws HiveException { - addRowToBatchFrom(row, oi, rowIndex, 0, batch, buffer); } /** @@ -621,31 +544,30 @@ public class VectorizedBatchUtil { return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,oids); } - public static PrimitiveTypeInfo[] primitiveTypeInfosFromStructObjectInspector( + public static String[] columnNamesFromStructObjectInspector( StructObjectInspector structObjectInspector) throws HiveException { List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs(); - PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[fields.size()]; + String[] result = new String[fields.size()]; int i = 0; for(StructField field : fields) { - TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString( - field.getFieldObjectInspector().getTypeName()); - result[i++] = (PrimitiveTypeInfo) typeInfo; + result[i++] = field.getFieldName(); } return result; } - public static PrimitiveTypeInfo[] primitiveTypeInfosFromTypeNames( - String[] typeNames) throws HiveException { - - PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[typeNames.length]; + public static TypeInfo[] typeInfosFromTypeNames(String[] typeNames) throws HiveException { + ArrayList<TypeInfo> typeInfoList = + TypeInfoUtils.typeInfosFromTypeNames(Arrays.asList(typeNames)); + return typeInfoList.toArray(new TypeInfo[0]); + } - for(int i = 0; i < typeNames.length; i++) { - TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeNames[i]); - result[i] = (PrimitiveTypeInfo) typeInfo; - } - return result; + public static TypeInfo[] typeInfosFromStructObjectInspector( + StructObjectInspector structObjectInspector) { + ArrayList<TypeInfo> typeInfoList = + TypeInfoUtils.typeInfosFromStructObjectInspector(structObjectInspector); + return typeInfoList.toArray(new TypeInfo[0]); } static ColumnVector cloneColumnVector(ColumnVector source
