HIVE-12625: Backport to branch-1 HIVE-11981 ORC Schema Evolution Issues (Vectorized, ACID, and Non-Vectorized) (Matt McCline, reviewed by Prasanth J) HIVE-12728: Apply DDL restrictions for ORC schema evolution (Prasanth Jayachandran reviewed by Matt McCline and Gunther Hagleitner) HIVE-12799: Always use Schema Evolution for ACID (Matt McCline, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0fd9069e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0fd9069e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0fd9069e Branch: refs/heads/branch-1 Commit: 0fd9069e9b8e6c97d2068cc449aa14c9773027ca Parents: 9b5f1ff Author: Matt McCline <[email protected]> Authored: Tue Jan 12 09:55:57 2016 -0800 Committer: Matt McCline <[email protected]> Committed: Tue Jan 12 09:55:57 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 4 + .../mapreduce/FosterStorageHandler.java | 37 + .../hive/hcatalog/mapreduce/InputJobInfo.java | 8 +- .../hive/hcatalog/streaming/TestStreaming.java | 2 + .../hive/ql/txn/compactor/TestCompactor.java | 246 ++- .../test/resources/testconfiguration.properties | 16 + .../org/apache/hadoop/hive/ql/ErrorMsg.java | 13 +- .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 102 +- .../hadoop/hive/ql/exec/FetchOperator.java | 8 + .../apache/hadoop/hive/ql/exec/MapOperator.java | 25 +- .../hadoop/hive/ql/exec/TableScanOperator.java | 20 + .../apache/hadoop/hive/ql/exec/Utilities.java | 110 +- .../ql/exec/spark/SparkReduceRecordHandler.java | 11 +- .../hive/ql/exec/tez/ReduceRecordProcessor.java | 2 +- .../hive/ql/exec/tez/ReduceRecordSource.java | 15 +- .../hive/ql/exec/vector/BytesColumnVector.java | 99 +- .../hive/ql/exec/vector/ColumnVector.java | 92 +- .../ql/exec/vector/DecimalColumnVector.java | 82 +- .../hive/ql/exec/vector/DoubleColumnVector.java | 67 +- .../hive/ql/exec/vector/LongColumnVector.java | 73 +- .../ql/exec/vector/VectorDeserializeRow.java | 37 +- .../hive/ql/exec/vector/VectorExtractRow.java | 11 +- .../ql/exec/vector/VectorGroupByOperator.java | 2 +- .../exec/vector/VectorMapJoinBaseOperator.java | 4 +- .../exec/vector/VectorSMBMapJoinOperator.java | 2 +- .../hive/ql/exec/vector/VectorSerializeRow.java | 12 +- .../ql/exec/vector/VectorizationContext.java | 113 +- .../ql/exec/vector/VectorizedBatchUtil.java | 201 +- .../ql/exec/vector/VectorizedColumnarSerDe.java | 277 --- .../hive/ql/exec/vector/VectorizedRowBatch.java | 20 + .../ql/exec/vector/VectorizedRowBatchCtx.java | 556 ++---- .../mapjoin/VectorMapJoinCommonOperator.java | 65 +- .../VectorMapJoinGenerateResultOperator.java | 28 +- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 19 + .../hadoop/hive/ql/io/HiveInputFormat.java | 5 + .../apache/hadoop/hive/ql/io/IOConstants.java | 11 + .../io/SelfDescribingInputFormatInterface.java | 27 + .../hive/ql/io/VectorizedRCFileInputFormat.java | 81 - .../ql/io/VectorizedRCFileRecordReader.java | 261 --- .../ql/io/orc/ConversionTreeReaderFactory.java | 38 - .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 67 +- .../hive/ql/io/orc/OrcRawRecordMerger.java | 62 +- .../apache/hadoop/hive/ql/io/orc/OrcUtils.java | 562 ++++++ .../apache/hadoop/hive/ql/io/orc/Reader.java | 15 + .../hive/ql/io/orc/RecordReaderFactory.java | 269 --- .../hadoop/hive/ql/io/orc/RecordReaderImpl.java | 35 +- .../hadoop/hive/ql/io/orc/SchemaEvolution.java | 185 ++ .../hive/ql/io/orc/TreeReaderFactory.java | 188 +- .../hadoop/hive/ql/io/orc/TypeDescription.java | 514 ++++++ .../ql/io/orc/VectorizedOrcAcidRowReader.java | 45 +- .../ql/io/orc/VectorizedOrcInputFormat.java | 53 +- .../parquet/VectorizedParquetInputFormat.java | 26 +- .../hive/ql/optimizer/GenMapRedUtils.java | 18 + .../hive/ql/optimizer/SimpleFetchOptimizer.java | 1 + .../hive/ql/optimizer/physical/Vectorizer.java | 501 +++-- .../ql/optimizer/physical/Vectorizer.java.orig | 1744 ------------------ .../apache/hadoop/hive/ql/plan/BaseWork.java | 30 +- .../hadoop/hive/ql/plan/PartitionDesc.java | 15 +- .../hive/ql/plan/VectorPartitionConversion.java | 166 ++ .../hive/ql/plan/VectorPartitionDesc.java | 110 ++ .../ql/exec/vector/TestVectorRowObject.java | 5 +- .../hive/ql/exec/vector/TestVectorSerDeRow.java | 9 +- .../exec/vector/TestVectorizedRowBatchCtx.java | 355 ---- .../hive/ql/io/orc/TestInputOutputFormat.java | 49 +- .../hive/ql/io/orc/TestOrcRawRecordMerger.java | 42 + .../ql/io/orc/TestOrcRawRecordMerger.java.orig | 1150 ++++++++++++ .../hive/ql/io/orc/TestVectorizedORCReader.java | 75 +- .../clientnegative/orc_change_fileformat.q | 3 + .../clientnegative/orc_change_fileformat_acid.q | 3 + .../queries/clientnegative/orc_change_serde.q | 3 + .../clientnegative/orc_change_serde_acid.q | 3 + .../clientnegative/orc_reorder_columns1.q | 3 + .../clientnegative/orc_reorder_columns1_acid.q | 3 + .../clientnegative/orc_reorder_columns2.q | 3 + .../clientnegative/orc_reorder_columns2_acid.q | 3 + .../clientnegative/orc_replace_columns1.q | 3 + .../clientnegative/orc_replace_columns1_acid.q | 3 + .../clientnegative/orc_replace_columns2.q | 3 + .../clientnegative/orc_replace_columns2_acid.q | 3 + .../clientnegative/orc_replace_columns3.q | 4 + .../clientnegative/orc_replace_columns3_acid.q | 4 + .../clientnegative/orc_type_promotion1.q | 3 + .../clientnegative/orc_type_promotion1_acid.q | 3 + .../clientnegative/orc_type_promotion2.q | 10 + .../clientnegative/orc_type_promotion2_acid.q | 10 + .../clientnegative/orc_type_promotion3.q | 3 + .../clientnegative/orc_type_promotion3_acid.q | 3 + .../test/queries/clientpositive/dbtxnmgr_ddl1.q | 1 - .../test/queries/clientpositive/load_orc_part.q | 10 - .../clientpositive/orc_int_type_promotion.q | 14 +- .../clientpositive/orc_schema_evolution.q | 39 + .../schema_evol_orc_acid_mapwork_part.q | 173 ++ .../schema_evol_orc_acid_mapwork_table.q | 131 ++ .../schema_evol_orc_acidvec_mapwork_part.q | 173 ++ .../schema_evol_orc_acidvec_mapwork_table.q | 131 ++ .../schema_evol_orc_nonvec_fetchwork_part.q | 97 + .../schema_evol_orc_nonvec_fetchwork_table.q | 57 + .../schema_evol_orc_nonvec_mapwork_part.q | 97 + .../schema_evol_orc_nonvec_mapwork_table.q | 57 + .../schema_evol_orc_vec_mapwork_part.q | 97 + .../schema_evol_orc_vec_mapwork_table.q | 57 + .../schema_evol_text_fetchwork_table.q | 57 + .../schema_evol_text_mapwork_table.q | 57 + .../schema_evol_text_nonvec_fetchwork_part.q | 97 + .../schema_evol_text_nonvec_fetchwork_table.q | 67 + .../schema_evol_text_nonvec_mapwork_part.q | 97 + .../schema_evol_text_nonvec_mapwork_table.q | 67 + .../clientnegative/orc_change_fileformat.q.out | 13 + .../orc_change_fileformat_acid.q.out | 13 + .../clientnegative/orc_change_serde.q.out | 13 + .../clientnegative/orc_change_serde_acid.q.out | 13 + .../clientnegative/orc_reorder_columns1.q.out | 13 + .../orc_reorder_columns1_acid.q.out | 13 + .../clientnegative/orc_reorder_columns2.q.out | 13 + .../orc_reorder_columns2_acid.q.out | 13 + .../clientnegative/orc_replace_columns1.q.out | 13 + .../orc_replace_columns1_acid.q.out | 13 + .../clientnegative/orc_replace_columns2.q.out | 13 + .../orc_replace_columns2_acid.q.out | 13 + .../clientnegative/orc_replace_columns3.q.out | 21 + .../orc_replace_columns3_acid.q.out | 21 + .../clientnegative/orc_type_promotion1.q.out | 13 + .../orc_type_promotion1_acid.q.out | 13 + .../clientnegative/orc_type_promotion2.q.out | 69 + .../orc_type_promotion2_acid.q.out | 69 + .../clientnegative/orc_type_promotion3.q.out | 13 + .../orc_type_promotion3_acid.q.out | 13 + .../results/clientpositive/dbtxnmgr_ddl1.q.out | 9 - .../results/clientpositive/load_orc_part.q.out | 52 - .../clientpositive/orc_int_type_promotion.q.out | 86 +- .../clientpositive/orc_schema_evolution.q.out | 211 +++ .../schema_evol_orc_acid_mapwork_part.q.out | 1037 +++++++++++ .../schema_evol_orc_acid_mapwork_table.q.out | 651 +++++++ .../schema_evol_orc_acidvec_mapwork_part.q.out | 1037 +++++++++++ .../schema_evol_orc_acidvec_mapwork_table.q.out | 651 +++++++ .../schema_evol_orc_nonvec_fetchwork_part.q.out | 642 +++++++ ...schema_evol_orc_nonvec_fetchwork_table.q.out | 298 +++ .../schema_evol_orc_nonvec_mapwork_part.q.out | 642 +++++++ .../schema_evol_orc_nonvec_mapwork_table.q.out | 298 +++ .../schema_evol_orc_vec_mapwork_part.q.out | 642 +++++++ .../schema_evol_orc_vec_mapwork_table.q.out | 298 +++ .../schema_evol_text_fetchwork_table.q.out | 298 +++ .../schema_evol_text_mapwork_table.q.out | 298 +++ ...schema_evol_text_nonvec_fetchwork_part.q.out | 642 +++++++ ...chema_evol_text_nonvec_fetchwork_table.q.out | 297 +++ .../schema_evol_text_nonvec_mapwork_part.q.out | 642 +++++++ .../schema_evol_text_nonvec_mapwork_table.q.out | 297 +++ .../tez/schema_evol_orc_acid_mapwork_part.q.out | 1037 +++++++++++ .../schema_evol_orc_acid_mapwork_table.q.out | 651 +++++++ .../schema_evol_orc_acidvec_mapwork_part.q.out | 1037 +++++++++++ .../schema_evol_orc_acidvec_mapwork_table.q.out | 651 +++++++ .../schema_evol_orc_nonvec_fetchwork_part.q.out | 642 +++++++ ...schema_evol_orc_nonvec_fetchwork_table.q.out | 298 +++ .../schema_evol_orc_nonvec_mapwork_part.q.out | 642 +++++++ .../schema_evol_orc_nonvec_mapwork_table.q.out | 298 +++ .../tez/schema_evol_orc_vec_mapwork_part.q.out | 642 +++++++ .../tez/schema_evol_orc_vec_mapwork_table.q.out | 298 +++ .../tez/schema_evol_text_fetchwork_table.q.out | 298 +++ .../tez/schema_evol_text_mapwork_table.q.out | 298 +++ ...schema_evol_text_nonvec_fetchwork_part.q.out | 642 +++++++ ...chema_evol_text_nonvec_fetchwork_table.q.out | 297 +++ .../schema_evol_text_nonvec_mapwork_part.q.out | 642 +++++++ .../schema_evol_text_nonvec_mapwork_table.q.out | 297 +++ .../tez/vector_partition_diff_num_cols.q.out | 1 + .../vector_partition_diff_num_cols.q.out | 1 + .../fast/BinarySortableDeserializeRead.java | 23 +- .../fast/BinarySortableSerializeWrite.java | 5 - .../hive/serde2/fast/DeserializeRead.java | 6 +- .../lazy/fast/LazySimpleDeserializeRead.java | 175 +- .../lazy/fast/LazySimpleSerializeWrite.java | 11 +- .../hive/serde2/lazybinary/LazyBinarySerDe.java | 29 + .../fast/LazyBinaryDeserializeRead.java | 29 +- .../fast/LazyBinarySerializeWrite.java | 3 +- .../hive/serde2/typeinfo/TypeInfoUtils.java | 36 + 174 files changed, 24596 insertions(+), 4554 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e793174..787e25e 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -910,6 +910,10 @@ public class HiveConf extends Configuration { "The threshold for the input file size of the small tables; if the file size is smaller \n" + "than this threshold, it will try to convert the common join into map join"), + + HIVE_SCHEMA_EVOLUTION("hive.exec.schema.evolution", false, + "Use schema evolution to convert self-describing file format's data to the schema desired by the reader."), + HIVESAMPLERANDOMNUM("hive.sample.seednumber", 0, "A number used to percentage sampling. By changing this number, user will change the subsets of data sampled."), http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java index 5a95467..bc56d77 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -35,6 +36,10 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.OutputFormat; import org.apache.hive.hcatalog.common.HCatConstants; import org.apache.hive.hcatalog.common.HCatUtil; +import org.apache.hive.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hive.hcatalog.data.schema.HCatSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; @@ -48,6 +53,8 @@ import java.util.Map; */ public class FosterStorageHandler extends DefaultStorageHandler { + private static final Logger LOG = LoggerFactory.getLogger(FosterStorageHandler.class); + public Configuration conf; /** The directory under which data is initially written for a non partitioned table */ protected static final String TEMP_DIR_NAME = "_TEMP"; @@ -98,6 +105,36 @@ public class FosterStorageHandler extends DefaultStorageHandler { @Override public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) { + + try { + Map<String, String> tableProperties = tableDesc.getJobProperties(); + + String jobInfoProperty = tableProperties.get(HCatConstants.HCAT_KEY_JOB_INFO); + if (jobInfoProperty != null) { + + InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(jobInfoProperty); + + HCatTableInfo tableInfo = inputJobInfo.getTableInfo(); + HCatSchema dataColumns = tableInfo.getDataColumns(); + List<HCatFieldSchema> dataFields = dataColumns.getFields(); + StringBuilder columnNamesSb = new StringBuilder(); + StringBuilder typeNamesSb = new StringBuilder(); + for (HCatFieldSchema dataField : dataFields) { + if (columnNamesSb.length() > 0) { + columnNamesSb.append(","); + typeNamesSb.append(":"); + } + columnNamesSb.append(dataField.getName()); + typeNamesSb.append(dataField.getTypeString()); + } + jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNamesSb.toString()); + jobProperties.put(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, typeNamesSb.toString()); + + } + } catch (IOException e) { + throw new IllegalStateException("Failed to set output path", e); + } + } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java ---------------------------------------------------------------------- diff --git a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java index 1f23f3f..7ec6ae3 100644 --- a/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java +++ b/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/InputJobInfo.java @@ -182,9 +182,11 @@ public class InputJobInfo implements Serializable { ObjectInputStream partInfoReader = new ObjectInputStream(new InflaterInputStream(ois)); partitions = (List<PartInfo>)partInfoReader.readObject(); - for (PartInfo partInfo : partitions) { - if (partInfo.getTableInfo() == null) { - partInfo.setTableInfo(this.tableInfo); + if (partitions != null) { + for (PartInfo partInfo : partitions) { + if (partInfo.getTableInfo() == null) { + partInfo.setTableInfo(this.tableInfo); + } } } } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java index 3458b65..ff2598f 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java @@ -464,6 +464,8 @@ public class TestStreaming { JobConf job = new JobConf(); job.set("mapred.input.dir", partitionPath.toString()); job.set("bucket_count", Integer.toString(buckets)); + job.set("columns", "id,msg"); + job.set("columns.types", "bigint:string"); job.set(ValidTxnList.VALID_TXNS_KEY, txns.toString()); InputSplit[] splits = inf.getSplits(job, buckets); Assert.assertEquals(buckets, splits.length); http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index e2910dd..dabe434 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -128,7 +128,194 @@ public class TestCompactor { driver.close(); } } - + + /** + * Simple schema evolution add columns with partitioning. + * @throws Exception + */ + @Test + public void schemaEvolutionAddColDynamicPartitioningInsert() throws Exception { + String tblName = "dpct"; + List<String> colNames = Arrays.asList("a", "b"); + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " PARTITIONED BY(ds string)" + + " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + + // First INSERT round. + executeStatementOnDriver("insert into " + tblName + " partition (ds) values (1, 'fred', " + + "'today'), (2, 'wilma', 'yesterday')", driver); + + // ALTER TABLE ... ADD COLUMNS + executeStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(c int)", driver); + + // Validate there is an added NULL for column c. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(2, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1)); + + // Second INSERT round with new inserts into previously existing partition 'yesterday'. + executeStatementOnDriver("insert into " + tblName + " partition (ds) values " + + "(3, 'mark', 1900, 'soon'), (4, 'douglas', 1901, 'last_century'), " + + "(5, 'doc', 1902, 'yesterday')", + driver); + + // Validate there the new insertions for column c. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(5, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1)); + Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2)); + Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3)); + Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4)); + + Initiator initiator = new Initiator(); + initiator.setThreadId((int)initiator.getId()); + conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 0); + initiator.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(); + stop.set(true); + initiator.init(stop, new AtomicBoolean()); + initiator.run(); + + CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(4, compacts.size()); + SortedSet<String> partNames = new TreeSet<String>(); + for (int i = 0; i < compacts.size(); i++) { + Assert.assertEquals("default", compacts.get(i).getDbname()); + Assert.assertEquals(tblName, compacts.get(i).getTablename()); + Assert.assertEquals("initiated", compacts.get(i).getState()); + partNames.add(compacts.get(i).getPartitionname()); + } + List<String> names = new ArrayList<String>(partNames); + Assert.assertEquals("ds=last_century", names.get(0)); + Assert.assertEquals("ds=soon", names.get(1)); + Assert.assertEquals("ds=today", names.get(2)); + Assert.assertEquals("ds=yesterday", names.get(3)); + + // Validate after compaction. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(5, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tfred\tNULL\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\twilma\tNULL\tyesterday", valuesReadFromHiveDriver.get(1)); + Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2)); + Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3)); + Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4)); + + } + + @Test + public void schemaEvolutionAddColDynamicPartitioningUpdate() throws Exception { + String tblName = "udpct"; + List<String> colNames = Arrays.asList("a", "b"); + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " PARTITIONED BY(ds string)" + + " CLUSTERED BY(a) INTO 2 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + executeStatementOnDriver("insert into " + tblName + " partition (ds) values (1, 'fred', " + + "'today'), (2, 'wilma', 'yesterday')", driver); + + executeStatementOnDriver("update " + tblName + " set b = 'barney'", driver); + + // Validate the update. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + ArrayList<String> valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(2, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tbarney\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\tbarney\tyesterday", valuesReadFromHiveDriver.get(1)); + + // ALTER TABLE ... ADD COLUMNS + executeStatementOnDriver("ALTER TABLE " + tblName + " ADD COLUMNS(c int)", driver); + + // Validate there is an added NULL for column c. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(2, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tbarney\tNULL\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\tbarney\tNULL\tyesterday", valuesReadFromHiveDriver.get(1)); + + // Second INSERT round with new inserts into previously existing partition 'yesterday'. + executeStatementOnDriver("insert into " + tblName + " partition (ds) values " + + "(3, 'mark', 1900, 'soon'), (4, 'douglas', 1901, 'last_century'), " + + "(5, 'doc', 1902, 'yesterday')", + driver); + + // Validate there the new insertions for column c. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(5, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tbarney\tNULL\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\tbarney\tNULL\tyesterday", valuesReadFromHiveDriver.get(1)); + Assert.assertEquals("3\tmark\t1900\tsoon", valuesReadFromHiveDriver.get(2)); + Assert.assertEquals("4\tdouglas\t1901\tlast_century", valuesReadFromHiveDriver.get(3)); + Assert.assertEquals("5\tdoc\t1902\tyesterday", valuesReadFromHiveDriver.get(4)); + + executeStatementOnDriver("update " + tblName + " set c = 2000", driver); + + // Validate the update of new column c, even in old rows. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(5, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tbarney\t2000\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\tbarney\t2000\tyesterday", valuesReadFromHiveDriver.get(1)); + Assert.assertEquals("3\tmark\t2000\tsoon", valuesReadFromHiveDriver.get(2)); + Assert.assertEquals("4\tdouglas\t2000\tlast_century", valuesReadFromHiveDriver.get(3)); + Assert.assertEquals("5\tdoc\t2000\tyesterday", valuesReadFromHiveDriver.get(4)); + + Initiator initiator = new Initiator(); + initiator.setThreadId((int)initiator.getId()); + // Set to 1 so insert doesn't set it off but update does + conf.setIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD, 1); + initiator.setHiveConf(conf); + AtomicBoolean stop = new AtomicBoolean(); + stop.set(true); + initiator.init(stop, new AtomicBoolean()); + initiator.run(); + + CompactionTxnHandler txnHandler = new CompactionTxnHandler(conf); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals(4, compacts.size()); + SortedSet<String> partNames = new TreeSet<String>(); + for (int i = 0; i < compacts.size(); i++) { + Assert.assertEquals("default", compacts.get(i).getDbname()); + Assert.assertEquals(tblName, compacts.get(i).getTablename()); + Assert.assertEquals("initiated", compacts.get(i).getState()); + partNames.add(compacts.get(i).getPartitionname()); + } + List<String> names = new ArrayList<String>(partNames); + Assert.assertEquals("ds=last_century", names.get(0)); + Assert.assertEquals("ds=soon", names.get(1)); + Assert.assertEquals("ds=today", names.get(2)); + Assert.assertEquals("ds=yesterday", names.get(3)); + + // Validate after compaction. + executeStatementOnDriver("SELECT * FROM " + tblName + " ORDER BY a", driver); + valuesReadFromHiveDriver = new ArrayList<String>(); + driver.getResults(valuesReadFromHiveDriver); + Assert.assertEquals(5, valuesReadFromHiveDriver.size()); + Assert.assertEquals("1\tbarney\t2000\ttoday", valuesReadFromHiveDriver.get(0)); + Assert.assertEquals("2\tbarney\t2000\tyesterday", valuesReadFromHiveDriver.get(1)); + Assert.assertEquals("3\tmark\t2000\tsoon", valuesReadFromHiveDriver.get(2)); + Assert.assertEquals("4\tdouglas\t2000\tlast_century", valuesReadFromHiveDriver.get(3)); + Assert.assertEquals("5\tdoc\t2000\tyesterday", valuesReadFromHiveDriver.get(4)); + } + /** * After each major compaction, stats need to be updated on each column of the * table/partition which previously had stats. @@ -255,7 +442,9 @@ public class TestCompactor { t.run(); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - Assert.assertEquals(1, compacts.size()); + if (1 != compacts.size()) { + Assert.fail("Expecting 1 file and found " + compacts.size() + " files " + compacts.toString()); + } Assert.assertEquals("ready for cleaning", compacts.get(0).getState()); stats = msClient.getPartitionColumnStatistics(ci.dbname, ci.tableName, @@ -409,6 +598,8 @@ public class TestCompactor { String dbName = "default"; String tblName = "cws"; List<String> colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; executeStatementOnDriver("drop table if exists " + tblName, driver); executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed @@ -452,9 +643,12 @@ public class TestCompactor { } } Arrays.sort(names); - Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002", - "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"}); - checkExpectedTxnsPresent(null, new Path[]{resultFile}, 0, 1L, 4L); + String[] expected = new String[]{"delta_0000001_0000002", + "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"}; + if (!Arrays.deepEquals(expected, names)) { + Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); + } + checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 1L, 4L); } finally { connection.close(); @@ -466,6 +660,8 @@ public class TestCompactor { String dbName = "default"; String tblName = "cws"; List<String> colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; executeStatementOnDriver("drop table if exists " + tblName, driver); executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed @@ -500,10 +696,12 @@ public class TestCompactor { FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter); - Assert.assertEquals(1, stat.length); + if (1 != stat.length) { + Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); + } String name = stat[0].getPath().getName(); Assert.assertEquals(name, "base_0000004"); - checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L); + checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L); } finally { connection.close(); } @@ -514,6 +712,8 @@ public class TestCompactor { String dbName = "default"; String tblName = "cws"; List<String> colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; executeStatementOnDriver("drop table if exists " + tblName, driver); executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed @@ -561,9 +761,12 @@ public class TestCompactor { } } Arrays.sort(names); - Assert.assertArrayEquals(names, new String[]{"delta_0000001_0000002", - "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"}); - checkExpectedTxnsPresent(null, new Path[]{resultDelta}, 0, 1L, 4L); + String[] expected = new String[]{"delta_0000001_0000002", + "delta_0000001_0000006", "delta_0000003_0000004", "delta_0000005_0000006"}; + if (!Arrays.deepEquals(expected, names)) { + Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); + } + checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L); } finally { connection.close(); } @@ -574,6 +777,8 @@ public class TestCompactor { String dbName = "default"; String tblName = "cws"; List<String> colNames = Arrays.asList("a", "b"); + String columnNamesProperty = "a,b"; + String columnTypesProperty = "int:string"; executeStatementOnDriver("drop table if exists " + tblName, driver); executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + " CLUSTERED BY(a) INTO 1 BUCKETS" + //currently ACID requires table to be bucketed @@ -613,10 +818,17 @@ public class TestCompactor { FileSystem fs = FileSystem.get(conf); FileStatus[] stat = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter); - Assert.assertEquals(1, stat.length); + if (1 != stat.length) { + Assert.fail("majorCompactAfterAbort FileStatus[] stat " + Arrays.toString(stat)); + } + if (1 != stat.length) { + Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat)); + } String name = stat[0].getPath().getName(); - Assert.assertEquals(name, "base_0000006"); - checkExpectedTxnsPresent(stat[0].getPath(), null, 0, 1L, 4L); + if (!name.equals("base_0000006")) { + Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006"); + } + checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L); } finally { connection.close(); } @@ -642,7 +854,8 @@ public class TestCompactor { } } - private void checkExpectedTxnsPresent(Path base, Path[] deltas, int bucket, long min, long max) + private void checkExpectedTxnsPresent(Path base, Path[] deltas, String columnNamesProperty, + String columnTypesProperty, int bucket, long min, long max) throws IOException { ValidTxnList txnList = new ValidTxnList() { @Override @@ -678,8 +891,11 @@ public class TestCompactor { OrcInputFormat aif = new OrcInputFormat(); + Configuration conf = new Configuration(); + conf.set("columns", columnNamesProperty); + conf.set("columns.types", columnTypesProperty); AcidInputFormat.RawReader<OrcStruct> reader = - aif.getRawReader(new Configuration(), false, bucket, txnList, base, deltas); + aif.getRawReader(conf, false, bucket, txnList, base, deltas); RecordIdentifier identifier = reader.createKey(); OrcStruct value = reader.createValue(); long currentTxn = min; http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 1c8a80d..290cff2 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -156,6 +156,22 @@ minitez.query.files.shared=acid_globallimit.q,\ ptf_matchpath.q,\ ptf_streaming.q,\ sample1.q,\ + schema_evol_text_nonvec_mapwork_table.q,\ + schema_evol_text_nonvec_fetchwork_table.q,\ + schema_evol_orc_nonvec_fetchwork_part.q,\ + schema_evol_orc_nonvec_mapwork_part.q,\ + schema_evol_text_nonvec_fetchwork_part.q,\ + schema_evol_text_nonvec_mapwork_part.q,\ + schema_evol_orc_acid_mapwork_part.q,\ + schema_evol_orc_acid_mapwork_table.q,\ + schema_evol_orc_acidvec_mapwork_table.q,\ + schema_evol_orc_acidvec_mapwork_part.q,\ + schema_evol_orc_vec_mapwork_part.q,\ + schema_evol_text_fetchwork_table.q,\ + schema_evol_text_mapwork_table.q,\ + schema_evol_orc_vec_mapwork_table.q,\ + schema_evol_orc_nonvec_mapwork_table.q,\ + schema_evol_orc_nonvec_fetchwork_table.q,\ selectDistinctStar.q,\ script_env_var1.q,\ script_env_var2.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 34461ed..7acca77 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -435,6 +435,12 @@ public enum ErrorMsg { CTAS_CREATES_VOID_TYPE(10305, "CREATE-TABLE-AS-SELECT creates a VOID type, please use CAST to specify the type, near field: "), //{2} should be lockid LOCK_ACQUIRE_TIMEDOUT(10307, "Lock acquisition for {0} timed out after {1}ms. {2}", true), + CANNOT_CHANGE_SERDE(10309, "Changing SerDe (from {0}) is not supported for table {1}. File format may be incompatible", true), + CANNOT_CHANGE_FILEFORMAT(10310, "Changing file format (from {0}) is not supported for table {1}", true), + CANNOT_REORDER_COLUMNS(10311, "Reordering columns is not supported for table {0}. SerDe may be incompatible", true), + CANNOT_CHANGE_COLUMN_TYPE(10312, "Changing from type {0} to {1} is not supported for column {2}. SerDe may be incompatible", true), + REPLACE_CANNOT_DROP_COLUMNS(10313, "Replacing columns cannot drop columns for table {0}. SerDe may be incompatible", true), + REPLACE_UNSUPPORTED_TYPE_CONVERSION(10314, "Replacing columns with unsupported type conversion (from {0} to {1}) for column {2}. SerDe may be incompatible", true), //========================== 20000 range starts here ========================// SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."), SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. " @@ -497,8 +503,13 @@ public enum ErrorMsg { + "data, set " + HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA + " to true"), INVALID_FILE_FORMAT_IN_LOAD(30019, "The file that you are trying to load does not match the" + - " file format of the destination table.") + " file format of the destination table."), + SCHEMA_REQUIRED_TO_READ_ACID_TABLES(30020, "Neither the configuration variables " + + "schema.evolution.columns / schema.evolution.columns.types " + + "nor the " + + "columns / columns.types " + + "are set. Table schema information is required to read ACID tables") ; private int errorCode; http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 2a64da3..56a0a4e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -48,6 +48,7 @@ import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import org.apache.commons.lang.StringEscapeUtils; @@ -97,9 +98,12 @@ import org.apache.hadoop.hive.ql.exec.ArchiveUtils.PartSpecInfo; import org.apache.hadoop.hive.ql.exec.tez.TezTask; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.merge.MergeFileTask; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask; import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork; @@ -3291,6 +3295,14 @@ public class DDLTask extends Task<DDLWork> implements Serializable { return 0; } + private boolean isSchemaEvolutionEnabled(Table tbl) { + boolean isAcid = AcidUtils.isTablePropertyTransactional(tbl.getMetadata()); + if (isAcid || HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION)) { + return true; + } + return false; + } + private int alterTableOrSinglePartition(AlterTableDesc alterTbl, Table tbl, Partition part) throws HiveException { @@ -3336,6 +3348,13 @@ public class DDLTask extends Task<DDLWork> implements Serializable { String comment = alterTbl.getNewColComment(); boolean first = alterTbl.getFirst(); String afterCol = alterTbl.getAfterCol(); + // if orc table, restrict reordering columns as it will break schema evolution + boolean isOrcSchemaEvolution = + sd.getInputFormat().equals(OrcInputFormat.class.getName()) && + isSchemaEvolutionEnabled(tbl); + if (isOrcSchemaEvolution && (first || (afterCol != null && !afterCol.trim().isEmpty()))) { + throw new HiveException(ErrorMsg.CANNOT_REORDER_COLUMNS, alterTbl.getOldName()); + } FieldSchema column = null; boolean found = false; @@ -3352,6 +3371,12 @@ public class DDLTask extends Task<DDLWork> implements Serializable { && !oldColName.equalsIgnoreCase(oldName)) { throw new HiveException(ErrorMsg.DUPLICATE_COLUMN_NAMES, newName); } else if (oldColName.equalsIgnoreCase(oldName)) { + // if orc table, restrict changing column types. Only integer type promotion is supported. + // smallint -> int -> bigint + if (isOrcSchemaEvolution && !isSupportedTypeChange(col.getType(), type)) { + throw new HiveException(ErrorMsg.CANNOT_CHANGE_COLUMN_TYPE, col.getType(), type, + newName); + } col.setName(newName); if (type != null && !type.trim().equals("")) { col.setType(type); @@ -3403,9 +3428,31 @@ public class DDLTask extends Task<DDLWork> implements Serializable { && !serializationLib.equals(LazySimpleSerDe.class.getName()) && !serializationLib.equals(ColumnarSerDe.class.getName()) && !serializationLib.equals(DynamicSerDe.class.getName()) - && !serializationLib.equals(ParquetHiveSerDe.class.getName())) { + && !serializationLib.equals(ParquetHiveSerDe.class.getName()) + && !serializationLib.equals(OrcSerde.class.getName())) { throw new HiveException(ErrorMsg.CANNOT_REPLACE_COLUMNS, alterTbl.getOldName()); } + final boolean isOrcSchemaEvolution = + serializationLib.equals(OrcSerde.class.getName()) && + isSchemaEvolutionEnabled(tbl); + // adding columns and limited integer type promotion is supported for ORC schema evolution + if (isOrcSchemaEvolution) { + final List<FieldSchema> existingCols = sd.getCols(); + final List<FieldSchema> replaceCols = alterTbl.getNewCols(); + + if (replaceCols.size() < existingCols.size()) { + throw new HiveException(ErrorMsg.REPLACE_CANNOT_DROP_COLUMNS, alterTbl.getOldName()); + } + + for (int i = 0; i < existingCols.size(); i++) { + final String currentColType = existingCols.get(i).getType().toLowerCase().trim(); + final String newColType = replaceCols.get(i).getType().toLowerCase().trim(); + if (!isSupportedTypeChange(currentColType, newColType)) { + throw new HiveException(ErrorMsg.REPLACE_UNSUPPORTED_TYPE_CONVERSION, currentColType, + newColType, replaceCols.get(i).getName()); + } + } + } sd.setCols(alterTbl.getNewCols()); } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDPROPS) { tbl.getTTable().getParameters().putAll(alterTbl.getProps()); @@ -3420,6 +3467,14 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDSERDE) { StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd()); String serdeName = alterTbl.getSerdeName(); + String oldSerdeName = sd.getSerdeInfo().getSerializationLib(); + // if orc table, restrict changing the serde as it can break schema evolution + if (isSchemaEvolutionEnabled(tbl) && + oldSerdeName.equalsIgnoreCase(OrcSerde.class.getName()) && + !serdeName.equalsIgnoreCase(OrcSerde.class.getName())) { + throw new HiveException(ErrorMsg.CANNOT_CHANGE_SERDE, OrcSerde.class.getSimpleName(), + alterTbl.getOldName()); + } sd.getSerdeInfo().setSerializationLib(serdeName); if ((alterTbl.getProps() != null) && (alterTbl.getProps().size() > 0)) { sd.getSerdeInfo().getParameters().putAll(alterTbl.getProps()); @@ -3434,6 +3489,12 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } } else if (alterTbl.getOp() == AlterTableDesc.AlterTableTypes.ADDFILEFORMAT) { StorageDescriptor sd = (part == null ? tbl.getTTable().getSd() : part.getTPartition().getSd()); + // if orc table, restrict changing the file format as it can break schema evolution + if (isSchemaEvolutionEnabled(tbl) && + sd.getInputFormat().equals(OrcInputFormat.class.getName()) + && !alterTbl.getInputFormat().equals(OrcInputFormat.class.getName())) { + throw new HiveException(ErrorMsg.CANNOT_CHANGE_FILEFORMAT, "ORC", alterTbl.getOldName()); + } sd.setInputFormat(alterTbl.getInputFormat()); sd.setOutputFormat(alterTbl.getOutputFormat()); if (alterTbl.getSerdeName() != null) { @@ -3554,6 +3615,45 @@ public class DDLTask extends Task<DDLWork> implements Serializable { return 0; } + + // don't change the order of enums as ordinal values are used to check for valid type promotions + enum PromotableTypes { + SMALLINT, + INT, + BIGINT; + + static List<String> types() { + return ImmutableList.of(SMALLINT.toString().toLowerCase(), + INT.toString().toLowerCase(), BIGINT.toString().toLowerCase()); + } + } + + // for ORC, only supported type promotions are smallint -> int -> bigint. No other + // type promotions are supported at this point + private boolean isSupportedTypeChange(String currentType, String newType) { + if (currentType != null && newType != null) { + currentType = currentType.toLowerCase().trim(); + newType = newType.toLowerCase().trim(); + // no type change + if (currentType.equals(newType)) { + return true; + } + if (PromotableTypes.types().contains(currentType) + && PromotableTypes.types().contains(newType)) { + PromotableTypes pCurrentType = PromotableTypes.valueOf(currentType.toUpperCase()); + PromotableTypes pNewType = PromotableTypes.valueOf(newType.toUpperCase()); + if (pNewType.ordinal() >= pCurrentType.ordinal()) { + return true; + } else { + return false; + } + } else { + return false; + } + } + return true; + } + /** * Drop a given table or some partitions. DropTableDesc is currently used for both. * http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 258d28e..b6e5739 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -133,6 +133,10 @@ public class FetchOperator implements Serializable { this.job = job; this.work = work; this.operator = operator; + if (operator instanceof TableScanOperator) { + Utilities.addTableSchemaToConf(job, + (TableScanOperator) operator); + } this.vcCols = vcCols; this.hasVC = vcCols != null && !vcCols.isEmpty(); this.isStatReader = work.getTblDesc() == null; @@ -598,6 +602,10 @@ public class FetchOperator implements Serializable { } private boolean needConversion(PartitionDesc partitionDesc) { + boolean isAcid = AcidUtils.isTablePropertyTransactional(partitionDesc.getTableDesc().getProperties()); + if (Utilities.isSchemaEvolutionEnabled(job, isAcid) && Utilities.isInputFileFormatSelfDescribing(partitionDesc)) { + return false; + } return needConversion(partitionDesc.getTableDesc(), Arrays.asList(partitionDesc)); } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java index f8717ae..afc03ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java @@ -38,8 +38,10 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.MapOperator.MapOpCtx; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.metadata.VirtualColumn; import org.apache.hadoop.hive.ql.plan.MapWork; @@ -63,6 +65,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.StringUtils; @@ -200,8 +203,14 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon opCtx.partName = String.valueOf(partSpec); opCtx.deserializer = pd.getDeserializer(hconf); - StructObjectInspector partRawRowObjectInspector = - (StructObjectInspector) opCtx.deserializer.getObjectInspector(); + StructObjectInspector partRawRowObjectInspector; + boolean isAcid = AcidUtils.isTablePropertyTransactional(td.getProperties()); + if (Utilities.isSchemaEvolutionEnabled(hconf, isAcid) && Utilities.isInputFileFormatSelfDescribing(pd)) { + partRawRowObjectInspector = tableRowOI; + } else { + partRawRowObjectInspector = + (StructObjectInspector) opCtx.deserializer.getObjectInspector(); + } opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(partRawRowObjectInspector, tableRowOI); @@ -302,8 +311,16 @@ public class MapOperator extends Operator<MapWork> implements Serializable, Clon PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile); TableDesc tableDesc = pd.getTableDesc(); Deserializer partDeserializer = pd.getDeserializer(hconf); - StructObjectInspector partRawRowObjectInspector = - (StructObjectInspector) partDeserializer.getObjectInspector(); + + StructObjectInspector partRawRowObjectInspector; + boolean isAcid = AcidUtils.isTablePropertyTransactional(tableDesc.getProperties()); + if (Utilities.isSchemaEvolutionEnabled(hconf, isAcid) && Utilities.isInputFileFormatSelfDescribing(pd)) { + Deserializer tblDeserializer = tableDesc.getDeserializer(hconf); + partRawRowObjectInspector = (StructObjectInspector) tblDeserializer.getObjectInspector(); + } else { + partRawRowObjectInspector = + (StructObjectInspector) partDeserializer.getObjectInspector(); + } StructObjectInspector tblRawRowObjectInspector = tableDescOI.get(tableDesc); if ((tblRawRowObjectInspector == null) || http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java index cbf02e9..d98ea84 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java @@ -70,6 +70,13 @@ public class TableScanOperator extends Operator<TableScanDesc> implements private String defaultPartitionName; + /** + * These values are saved during MapWork, FetchWork, etc preparation and later added to the the + * JobConf of each task. + */ + private String schemaEvolutionColumns; + private String schemaEvolutionColumnsTypes; + public TableDesc getTableDesc() { return tableDesc; } @@ -78,6 +85,19 @@ public class TableScanOperator extends Operator<TableScanDesc> implements this.tableDesc = tableDesc; } + public void setSchemaEvolution(String schemaEvolutionColumns, String schemaEvolutionColumnsTypes) { + this.schemaEvolutionColumns = schemaEvolutionColumns; + this.schemaEvolutionColumnsTypes = schemaEvolutionColumnsTypes; + } + + public String getSchemaEvolutionColumns() { + return schemaEvolutionColumns; + } + + public String getSchemaEvolutionColumnsTypes() { + return schemaEvolutionColumnsTypes; + } + /** * Other than gathering statistics for the ANALYZE command, the table scan operator * does not do anything special other than just forwarding the row. Since the table http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 3352b49..1d8e3b1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -107,6 +107,7 @@ import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; @@ -122,15 +123,19 @@ import org.apache.hadoop.hive.ql.exec.mr.MapRedTask; import org.apache.hadoop.hive.ql.exec.spark.SparkTask; import org.apache.hadoop.hive.ql.exec.tez.DagUtils; import org.apache.hadoop.hive.ql.exec.tez.TezTask; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat; import org.apache.hadoop.hive.ql.io.RCFile; import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat; +import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper; import org.apache.hadoop.hive.ql.io.merge.MergeFileWork; import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper; @@ -175,6 +180,12 @@ import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantListObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantMapObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StandardConstantStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.SequenceFile; @@ -478,11 +489,6 @@ public final class Utilities { } } - public static Map<Integer, String> getMapWorkVectorScratchColumnTypeMap(Configuration hiveConf) { - MapWork mapWork = getMapWork(hiveConf); - return mapWork.getVectorScratchColumnTypeMap(); - } - public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) { try { Graph stageGraph = plan.getQueryPlan().getStageGraph(); @@ -3901,6 +3907,27 @@ public final class Utilities { return false; } + /** + * @param conf + * @return the configured VectorizedRowBatchCtx for a MapWork task. + */ + public static VectorizedRowBatchCtx getVectorizedRowBatchCtx(Configuration conf) { + VectorizedRowBatchCtx result = null; + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) && + Utilities.getPlanPath(conf) != null) { + MapWork mapWork = Utilities.getMapWork(conf); + if (mapWork != null && mapWork.getVectorMode()) { + result = mapWork.getVectorizedRowBatchCtx(); + } + } + return result; + } + + public static boolean isVectorMode(Configuration conf, MapWork mapWork) { + return HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED) + && mapWork.getVectorMode(); + } + public static void clearWorkMapForConf(Configuration conf) { // Remove cached query plans for the current query only Path mapPath = getPlanPath(conf, MAP_PLAN_NAME); @@ -4057,4 +4084,77 @@ public final class Utilities { (loggingLevel.equalsIgnoreCase("PERFORMANCE") || loggingLevel.equalsIgnoreCase("VERBOSE")); } + public static boolean isSchemaEvolutionEnabled(Configuration conf, boolean isAcid) { + return isAcid || HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION); + } + + public static boolean isInputFileFormatSelfDescribing(PartitionDesc pd) { + Class<?> inputFormatClass = pd.getInputFileFormatClass(); + return SelfDescribingInputFormatInterface.class.isAssignableFrom(inputFormatClass); + } + + public static boolean isInputFileFormatVectorized(PartitionDesc pd) { + Class<?> inputFormatClass = pd.getInputFileFormatClass(); + return VectorizedInputFormatInterface.class.isAssignableFrom(inputFormatClass); + } + + public static void addSchemaEvolutionToTableScanOperator(Table table, + TableScanOperator tableScanOp) { + String colNames = MetaStoreUtils.getColumnNamesFromFieldSchema(table.getSd().getCols()); + String colTypes = MetaStoreUtils.getColumnTypesFromFieldSchema(table.getSd().getCols()); + tableScanOp.setSchemaEvolution(colNames, colTypes); + } + + public static void addSchemaEvolutionToTableScanOperator(StructObjectInspector structOI, + TableScanOperator tableScanOp) { + String colNames = ObjectInspectorUtils.getFieldNames(structOI); + String colTypes = ObjectInspectorUtils.getFieldTypes(structOI); + tableScanOp.setSchemaEvolution(colNames, colTypes); + } + + public static void unsetSchemaEvolution(Configuration conf) { + conf.unset(IOConstants.SCHEMA_EVOLUTION_COLUMNS); + conf.unset(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES); + } + + public static void addTableSchemaToConf(Configuration conf, + TableScanOperator tableScanOp) { + String schemaEvolutionColumns = tableScanOp.getSchemaEvolutionColumns(); + if (schemaEvolutionColumns != null) { + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, tableScanOp.getSchemaEvolutionColumns()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, tableScanOp.getSchemaEvolutionColumnsTypes()); + } else { + LOG.info("schema.evolution.columns and schema.evolution.columns.types not available"); + } + } + + /** + * Create row key and value object inspectors for reduce vectorization. + * The row object inspector used by ReduceWork needs to be a **standard** + * struct object inspector, not just any struct object inspector. + * @param keyInspector + * @param valueInspector + * @return OI + * @throws HiveException + */ + public static StandardStructObjectInspector constructVectorizedReduceRowOI( + StructObjectInspector keyInspector, StructObjectInspector valueInspector) + throws HiveException { + + ArrayList<String> colNames = new ArrayList<String>(); + ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>(); + List<? extends StructField> fields = keyInspector.getAllStructFieldRefs(); + for (StructField field: fields) { + colNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName()); + ois.add(field.getFieldObjectInspector()); + } + fields = valueInspector.getAllStructFieldRefs(); + for (StructField field: fields) { + colNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName()); + ois.add(field.getFieldObjectInspector()); + } + StandardStructObjectInspector rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois); + + return rowObjectInspector; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index ac5e3ca..f9e10c4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; @@ -153,10 +154,6 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { /* vectorization only works with struct object inspectors */ valueStructInspectors[tag] = (StructObjectInspector) valueObjectInspector[tag]; - ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair = VectorizedBatchUtil. - constructVectorizedRowBatch(keyStructInspector, - valueStructInspectors[tag], gWork.getVectorScratchColumnTypeMap()); - batches[tag] = pair.getFirst(); final int totalColumns = keysColumnOffset + valueStructInspectors[tag].getAllStructFieldRefs().size(); valueStringWriters[tag] = new ArrayList<VectorExpressionWriter>(totalColumns); @@ -165,7 +162,11 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { valueStringWriters[tag].addAll(Arrays.asList(VectorExpressionWriterFactory .genVectorStructExpressionWritables(valueStructInspectors[tag]))); - rowObjectInspector[tag] = pair.getSecond(); + rowObjectInspector[tag] = Utilities.constructVectorizedReduceRowOI(keyStructInspector, + valueStructInspectors[tag]); + batches[tag] = gWork.getVectorizedRowBatchCtx().createVectorizedRowBatch(); + + } else { ois.add(keyObjectInspector); ois.add(valueObjectInspector[tag]); http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index d649672..5edd587 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@ -237,7 +237,7 @@ public class ReduceRecordProcessor extends RecordProcessor{ boolean vectorizedRecordSource = (tag == bigTablePosition) && redWork.getVectorMode(); sources[tag].init(jconf, redWork.getReducer(), vectorizedRecordSource, keyTableDesc, valueTableDesc, reader, tag == bigTablePosition, (byte) tag, - redWork.getVectorScratchColumnTypeMap()); + redWork.getVectorizedRowBatchCtx()); ois[tag] = sources[tag].getObjectInspector(); } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index b634877..b1d2f52 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory; import org.apache.hadoop.hive.ql.log.PerfLogger; @@ -51,7 +52,6 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption; -import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobConf; @@ -123,7 +123,7 @@ public class ReduceRecordSource implements RecordSource { void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc, TableDesc valueTableDesc, Reader reader, boolean handleGroupKey, byte tag, - Map<Integer, String> vectorScratchColumnTypeMap) + VectorizedRowBatchCtx batchContext) throws Exception { ObjectInspector keyObjectInspector; @@ -174,10 +174,9 @@ public class ReduceRecordSource implements RecordSource { .asList(VectorExpressionWriterFactory .genVectorStructExpressionWritables(valueStructInspectors))); - ObjectPair<VectorizedRowBatch, StandardStructObjectInspector> pair = - VectorizedBatchUtil.constructVectorizedRowBatch(keyStructInspector, valueStructInspectors, vectorScratchColumnTypeMap); - rowObjectInspector = pair.getSecond(); - batch = pair.getFirst(); + rowObjectInspector = Utilities.constructVectorizedReduceRowOI(keyStructInspector, + valueStructInspectors); + batch = batchContext.createVectorizedRowBatch(); // Setup vectorized deserialization for the key and value. BinarySortableSerDe binarySortableSerDe = (BinarySortableSerDe) inputKeyDeserializer; @@ -185,7 +184,7 @@ public class ReduceRecordSource implements RecordSource { keyBinarySortableDeserializeToRow = new VectorDeserializeRow( new BinarySortableDeserializeRead( - VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector( + VectorizedBatchUtil.typeInfosFromStructObjectInspector( keyStructInspector), binarySortableSerDe.getSortOrders())); keyBinarySortableDeserializeToRow.init(0); @@ -195,7 +194,7 @@ public class ReduceRecordSource implements RecordSource { valueLazyBinaryDeserializeToRow = new VectorDeserializeRow( new LazyBinaryDeserializeRead( - VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector( + VectorizedBatchUtil.typeInfosFromStructObjectInspector( valueStructInspectors))); valueLazyBinaryDeserializeToRow.init(firstValueColumnOffset); http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java index 8ec7ead..99744cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/BytesColumnVector.java @@ -18,11 +18,6 @@ package org.apache.hadoop.hive.ql.exec.vector; -import java.util.Arrays; - -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; /** * This class supports string and binary data by value reference -- i.e. each field is @@ -51,9 +46,6 @@ public class BytesColumnVector extends ColumnVector { private byte[] buffer; // optional buffer to use when actually copying in data private int nextFree; // next free position in buffer - // Reusable text object - private final Text textObject = new Text(); - // Estimate that there will be 16 bytes per entry static final int DEFAULT_BUFFER_SIZE = 16 * VectorizedRowBatch.DEFAULT_SIZE; @@ -165,6 +157,19 @@ public class BytesColumnVector extends ColumnVector { } /** + * Set a field by actually copying in to a local buffer. + * If you must actually copy data in to the array, use this method. + * DO NOT USE this method unless it's not practical to set data by reference with setRef(). + * Setting data by reference tends to run a lot faster than copying data in. + * + * @param elementNum index within column vector to set + * @param sourceBuf container of source data + */ + public void setVal(int elementNum, byte[] sourceBuf) { + setVal(elementNum, sourceBuf, 0, sourceBuf.length); + } + + /** * Set a field to the concatenation of two string values. Result data is copied * into the internal buffer. * @@ -215,22 +220,6 @@ public class BytesColumnVector extends ColumnVector { buffer = newBuffer; } - @Override - public Writable getWritableObject(int index) { - if (this.isRepeating) { - index = 0; - } - Writable result = null; - if (!isNull[index] && vector[index] != null) { - textObject.clear(); - textObject.append(vector[index], start[index], length[index]); - result = textObject; - } else { - result = NullWritable.get(); - } - return result; - } - /** Copy the current object contents into the output. Only copy selected entries, * as indicated by selectedInUse and the sel array. */ @@ -294,7 +283,7 @@ public class BytesColumnVector extends ColumnVector { // Only copy data values if entry is not null. The string value // at position 0 is undefined if the position 0 value is null. - if (noNulls || (!noNulls && !isNull[0])) { + if (noNulls || !isNull[0]) { // loops start at position 1 because position 0 is already set if (selectedInUse) { @@ -320,14 +309,70 @@ public class BytesColumnVector extends ColumnVector { setRef(0, value, 0, value.length); } + // Fill the column vector with nulls + public void fillWithNulls() { + noNulls = false; + isRepeating = true; + vector[0] = null; + isNull[0] = true; + } + @Override public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) { - BytesColumnVector in = (BytesColumnVector) inputVector; - setVal(outElementNum, in.vector[inputElementNum], in.start[inputElementNum], in.length[inputElementNum]); + if (inputVector.isRepeating) { + inputElementNum = 0; + } + if (inputVector.noNulls || !inputVector.isNull[inputElementNum]) { + isNull[outElementNum] = false; + BytesColumnVector in = (BytesColumnVector) inputVector; + setVal(outElementNum, in.vector[inputElementNum], + in.start[inputElementNum], in.length[inputElementNum]); + } else { + isNull[outElementNum] = true; + noNulls = false; + } } @Override public void init() { initBuffer(0); } + + @Override + public void stringifyValue(StringBuilder buffer, int row) { + if (isRepeating) { + row = 0; + } + if (noNulls || !isNull[row]) { + buffer.append('"'); + buffer.append(new String(this.buffer, start[row], length[row])); + buffer.append('"'); + } else { + buffer.append("null"); + } + } + + @Override + public void ensureSize(int size, boolean preserveData) { + if (size > vector.length) { + super.ensureSize(size, preserveData); + int[] oldStart = start; + start = new int[size]; + int[] oldLength = length; + length = new int[size]; + byte[][] oldVector = vector; + vector = new byte[size][]; + if (preserveData) { + if (isRepeating) { + vector[0] = oldVector[0]; + start[0] = oldStart[0]; + length[0] = oldLength[0]; + } else { + System.arraycopy(oldVector, 0, vector, 0, oldVector.length); + System.arraycopy(oldStart, 0, start, 0 , oldStart.length); + System.arraycopy(oldLength, 0, length, 0, oldLength.length); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java index 6654166..fcb1ae9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/ColumnVector.java @@ -18,10 +18,9 @@ package org.apache.hadoop.hive.ql.exec.vector; +import java.io.IOException; import java.util.Arrays; -import org.apache.hadoop.io.Writable; - /** * ColumnVector contains the shared structure for the sub-types, * including NULL information, and whether this vector @@ -38,10 +37,15 @@ public abstract class ColumnVector { * The current kinds of column vectors. */ public static enum Type { + NONE, // Useful when the type of column vector has not be determined yet. LONG, DOUBLE, BYTES, - DECIMAL + DECIMAL, + STRUCT, + LIST, + MAP, + UNION } /* @@ -64,8 +68,6 @@ public abstract class ColumnVector { private boolean preFlattenIsRepeating; private boolean preFlattenNoNulls; - public abstract Writable getWritableObject(int index); - /** * Constructor for super-class ColumnVector. This is not called directly, * but used to initialize inherited fields. @@ -76,28 +78,42 @@ public abstract class ColumnVector { isNull = new boolean[len]; noNulls = true; isRepeating = false; + preFlattenNoNulls = true; + preFlattenIsRepeating = false; } /** - * Resets the column to default state - * - fills the isNull array with false - * - sets noNulls to true - * - sets isRepeating to false - */ - public void reset() { - if (false == noNulls) { - Arrays.fill(isNull, false); - } - noNulls = true; - isRepeating = false; + * Resets the column to default state + * - fills the isNull array with false + * - sets noNulls to true + * - sets isRepeating to false + */ + public void reset() { + if (!noNulls) { + Arrays.fill(isNull, false); } + noNulls = true; + isRepeating = false; + preFlattenNoNulls = true; + preFlattenIsRepeating = false; + } + + /** + * Sets the isRepeating flag. Recurses over structs and unions so that the + * flags are set correctly. + * @param isRepeating + */ + public void setRepeating(boolean isRepeating) { + this.isRepeating = isRepeating; + } - abstract public void flatten(boolean selectedInUse, int[] sel, int size); + abstract public void flatten(boolean selectedInUse, int[] sel, int size); // Simplify vector by brute-force flattening noNulls if isRepeating // This can be used to reduce combinatorial explosion of code paths in VectorExpressions // with many arguments. - public void flattenRepeatingNulls(boolean selectedInUse, int[] sel, int size) { + protected void flattenRepeatingNulls(boolean selectedInUse, int[] sel, + int size) { boolean nullFillValue; @@ -120,13 +136,13 @@ public abstract class ColumnVector { noNulls = false; } - public void flattenNoNulls(boolean selectedInUse, int[] sel, int size) { + protected void flattenNoNulls(boolean selectedInUse, int[] sel, + int size) { if (noNulls) { noNulls = false; if (selectedInUse) { for (int j = 0; j < size; j++) { - int i = sel[j]; - isNull[i] = false; + isNull[sel[j]] = false; } } else { Arrays.fill(isNull, 0, size, false); @@ -155,8 +171,10 @@ public abstract class ColumnVector { /** * Set the element in this column vector from the given input vector. + * This method can assume that the output does not have isRepeating set. */ - public abstract void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector); + public abstract void setElement(int outElementNum, int inputElementNum, + ColumnVector inputVector); /** * Initialize the column vector. This method can be overridden by specific column vector types. @@ -166,5 +184,33 @@ public abstract class ColumnVector { public void init() { // Do nothing by default } - } + /** + * Ensure the ColumnVector can hold at least size values. + * This method is deliberately *not* recursive because the complex types + * can easily have more (or less) children than the upper levels. + * @param size the new minimum size + * @param presesrveData should the old data be preserved? + */ + public void ensureSize(int size, boolean presesrveData) { + if (isNull.length < size) { + boolean[] oldArray = isNull; + isNull = new boolean[size]; + if (presesrveData && !noNulls) { + if (isRepeating) { + isNull[0] = oldArray[0]; + } else { + System.arraycopy(oldArray, 0, isNull, 0, oldArray.length); + } + } + } + } + + /** + * Print the value for this column into the given string builder. + * @param buffer the buffer to print into + * @param row the id of the row to print + */ + public abstract void stringifyValue(StringBuilder buffer, + int row); + } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java index 5009a42..fe8ad85 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/DecimalColumnVector.java @@ -17,14 +17,10 @@ */ package org.apache.hadoop.hive.ql.exec.vector; - import java.math.BigInteger; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; public class DecimalColumnVector extends ColumnVector { @@ -39,8 +35,6 @@ public class DecimalColumnVector extends ColumnVector { public short scale; public short precision; - private final HiveDecimalWritable writableObj = new HiveDecimalWritable(); - public DecimalColumnVector(int precision, int scale) { this(VectorizedRowBatch.DEFAULT_SIZE, precision, scale); } @@ -49,26 +43,31 @@ public class DecimalColumnVector extends ColumnVector { super(size); this.precision = (short) precision; this.scale = (short) scale; - final int len = size; - vector = new HiveDecimalWritable[len]; - for (int i = 0; i < len; i++) { + vector = new HiveDecimalWritable[size]; + for (int i = 0; i < size; i++) { vector[i] = new HiveDecimalWritable(HiveDecimal.ZERO); } } - @Override - public Writable getWritableObject(int index) { - if (isRepeating) { - index = 0; - } - if (!noNulls && isNull[index]) { - return NullWritable.get(); + // Fill the all the vector entries with provided value + public void fill(HiveDecimal value) { + noNulls = true; + isRepeating = true; + if (vector[0] == null) { + vector[0] = new HiveDecimalWritable(value); } else { - writableObj.set(vector[index]); - return writableObj; + vector[0].set(value); } } + // Fill the column vector with nulls + public void fillWithNulls() { + noNulls = false; + isRepeating = true; + vector[0] = null; + isNull[0] = true; + } + @Override public void flatten(boolean selectedInUse, int[] sel, int size) { // TODO Auto-generated method stub @@ -76,12 +75,35 @@ public class DecimalColumnVector extends ColumnVector { @Override public void setElement(int outElementNum, int inputElementNum, ColumnVector inputVector) { - HiveDecimal hiveDec = ((DecimalColumnVector) inputVector).vector[inputElementNum].getHiveDecimal(precision, scale); - if (hiveDec == null) { - noNulls = false; + if (inputVector.isRepeating) { + inputElementNum = 0; + } + if (inputVector.noNulls || !inputVector.isNull[inputElementNum]) { + HiveDecimal hiveDec = + ((DecimalColumnVector) inputVector).vector[inputElementNum] + .getHiveDecimal(precision, scale); + if (hiveDec == null) { + isNull[outElementNum] = true; + noNulls = false; + } else { + isNull[outElementNum] = false; + vector[outElementNum].set(hiveDec); + } + } else { isNull[outElementNum] = true; + noNulls = false; + } + } + + @Override + public void stringifyValue(StringBuilder buffer, int row) { + if (isRepeating) { + row = 0; + } + if (noNulls || !isNull[row]) { + buffer.append(vector[row].toString()); } else { - vector[outElementNum].set(hiveDec); + buffer.append("null"); } } @@ -110,4 +132,20 @@ public class DecimalColumnVector extends ColumnVector { HiveDecimal minimumNonZeroValue = HiveDecimal.create(BigInteger.ONE, scale); vector[elementNum].set(minimumNonZeroValue); } + + @Override + public void ensureSize(int size, boolean preserveData) { + if (size > vector.length) { + super.ensureSize(size, preserveData); + HiveDecimalWritable[] oldArray = vector; + vector = new HiveDecimalWritable[size]; + if (preserveData) { + // we copy all of the values to avoid creating more objects + System.arraycopy(oldArray, 0, vector, 0 , oldArray.length); + for(int i= oldArray.length; i < vector.length; ++i) { + vector[i] = new HiveDecimalWritable(HiveDecimal.ZERO); + } + } + } + } }
