http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java index 0adbea1..e1c2f31 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/mapjoin/VectorMapJoinGenerateResultOperator.java @@ -47,6 +47,8 @@ import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinaryDeserializeRead; import org.apache.hadoop.hive.serde2.lazybinary.fast.LazyBinarySerializeWrite; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.hive.serde2.ByteStream.Output; /** @@ -73,7 +75,9 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC private static final Log LOG = LogFactory.getLog(VectorMapJoinGenerateResultOperator.class.getName()); private static final String CLASS_NAME = VectorMapJoinGenerateResultOperator.class.getName(); - private transient PrimitiveTypeInfo[] bigTablePrimitiveTypeInfos; + //------------------------------------------------------------------------------------------------ + + private transient TypeInfo[] bigTableTypeInfos; private transient VectorSerializeRow bigTableVectorSerializeRow; @@ -394,14 +398,14 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC private void setupSpillSerDe(VectorizedRowBatch batch) throws HiveException { - PrimitiveTypeInfo[] inputObjInspectorsTypeInfos = - VectorizedBatchUtil.primitiveTypeInfosFromStructObjectInspector( + TypeInfo[] inputObjInspectorsTypeInfos = + VectorizedBatchUtil.typeInfosFromStructObjectInspector( (StructObjectInspector) inputObjInspectors[posBigTable]); List<Integer> projectedColumns = vContext.getProjectedColumns(); int projectionSize = vContext.getProjectedColumns().size(); - List<PrimitiveTypeInfo> typeInfoList = new ArrayList<PrimitiveTypeInfo>(); + List<TypeInfo> typeInfoList = new ArrayList<TypeInfo>(); List<Integer> noNullsProjectionList = new ArrayList<Integer>(); for (int i = 0; i < projectionSize; i++) { int projectedColumn = projectedColumns.get(i); @@ -413,17 +417,19 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC int[] noNullsProjection = ArrayUtils.toPrimitive(noNullsProjectionList.toArray(new Integer[0])); int noNullsProjectionSize = noNullsProjection.length; - bigTablePrimitiveTypeInfos = typeInfoList.toArray(new PrimitiveTypeInfo[0]); + bigTableTypeInfos = typeInfoList.toArray(new TypeInfo[0]); bigTableVectorSerializeRow = - new VectorSerializeRow(new LazyBinarySerializeWrite(noNullsProjectionSize)); + new VectorSerializeRow( + new LazyBinarySerializeWrite(noNullsProjectionSize)); bigTableVectorSerializeRow.init( - bigTablePrimitiveTypeInfos, - noNullsProjectionList); + bigTableTypeInfos, + noNullsProjection); - bigTableVectorDeserializeRow = new VectorDeserializeRow( - new LazyBinaryDeserializeRead(bigTablePrimitiveTypeInfos)); + bigTableVectorDeserializeRow = + new VectorDeserializeRow( + new LazyBinaryDeserializeRead(bigTableTypeInfos)); bigTableVectorDeserializeRow.init(noNullsProjection); } @@ -833,4 +839,4 @@ public abstract class VectorMapJoinGenerateResultOperator extends VectorMapJoinC sb.append("]"); return sb.toString(); } -} \ No newline at end of file +}
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 8f60e9d..6d4c198 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; @@ -33,6 +34,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Properties; import java.util.regex.Pattern; /** @@ -572,4 +575,20 @@ public class AcidUtils { original.add(stat); } } + + public static boolean isTablePropertyTransactional(Properties props) { + String resultStr = props.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (resultStr == null) { + resultStr = props.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + } + return resultStr != null && resultStr.equalsIgnoreCase("true"); + } + + public static boolean isTablePropertyTransactional(Map<String, String> parameters) { + String resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL); + if (resultStr == null) { + resultStr = parameters.get(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL.toUpperCase()); + } + return resultStr != null && resultStr.equalsIgnoreCase("true"); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 2d6e752..181ce84 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -490,11 +490,16 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> // ensure filters are not set from previous pushFilters jobConf.unset(TableScanDesc.FILTER_TEXT_CONF_STR); jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR); + + Utilities.unsetSchemaEvolution(jobConf); + TableScanDesc scanDesc = tableScan.getConf(); if (scanDesc == null) { return; } + Utilities.addTableSchemaToConf(jobConf, tableScan); + // construct column name list and types for reference by filter push down Utilities.setColumnNameList(jobConf, tableScan); Utilities.setColumnTypeList(jobConf, tableScan); http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java index 9879dfe..8d94da8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOConstants.java @@ -36,6 +36,17 @@ public final class IOConstants { public static final String AVRO = "AVRO"; public static final String AVROFILE = "AVROFILE"; + /** + * The desired TABLE column names and types for input format schema evolution. + * This is different than COLUMNS and COLUMNS_TYPES, which are based on individual partition + * metadata. + * + * Virtual columns and partition columns are not included + * + */ + public static final String SCHEMA_EVOLUTION_COLUMNS = "schema.evolution.columns"; + public static final String SCHEMA_EVOLUTION_COLUMNS_TYPES = "schema.evolution.columns.types"; + @VisibleForTesting public static final String CUSTOM_TEXT_SERDE = "CustomTextSerde"; http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java new file mode 100644 index 0000000..6c455bd --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SelfDescribingInputFormatInterface.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io; + +/** + * Marker interface to indicate a given input format is self-describing and + * can perform schema evolution itself. + */ +public interface SelfDescribingInputFormatInterface { + +} http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java deleted file mode 100644 index e9e1d5a..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.mapred.FileInputFormat; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.InputSplit; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.mapred.Reporter; - -/** - * A MapReduce/Hive Vectorized input format for RC files. - */ -public class VectorizedRCFileInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch> - implements InputFormatChecker { - - public VectorizedRCFileInputFormat() { - setMinSplitSize(SequenceFile.SYNC_INTERVAL); - } - - @Override - @SuppressWarnings("unchecked") - public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(InputSplit split, JobConf job, - Reporter reporter) throws IOException { - - reporter.setStatus(split.toString()); - - return new VectorizedRCFileRecordReader(job, (FileSplit) split); - } - - @Override - public boolean validateInput(FileSystem fs, HiveConf conf, - List<FileStatus> files) throws IOException { - if (files.size() <= 0) { - return false; - } - for (int fileId = 0; fileId < files.size(); fileId++) { - RCFile.Reader reader = null; - try { - reader = new RCFile.Reader(fs, files.get(fileId) - .getPath(), conf); - reader.close(); - reader = null; - } catch (IOException e) { - return false; - } finally { - if (null != reader) { - reader.close(); - } - } - } - return true; - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java deleted file mode 100644 index 4cc1c2f..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.io; - -import java.io.IOException; -import java.util.Collections; -import java.util.Map; -import java.util.WeakHashMap; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedBatchUtil; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; -import org.apache.hadoop.hive.ql.io.RCFile.KeyBuffer; -import org.apache.hadoop.hive.ql.io.RCFile.Reader; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.FileSplit; -import org.apache.hadoop.mapred.RecordReader; - -/** - * RCFileRecordReader. - */ -public class VectorizedRCFileRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> { - - private final Reader in; - private final long start; - private final long end; - private boolean more = true; - protected Configuration conf; - private final FileSplit split; - private final boolean useCache; - private VectorizedRowBatchCtx rbCtx; - private final LongWritable keyCache = new LongWritable(); - private final BytesRefArrayWritable colsCache = new BytesRefArrayWritable(); - private boolean addPartitionCols = true; - private final DataOutputBuffer buffer = new DataOutputBuffer(); - - private static RCFileSyncCache syncCache = new RCFileSyncCache(); - - private static final class RCFileSyncEntry { - long end; - long endSync; - } - - private static final class RCFileSyncCache { - - private final Map<String, RCFileSyncEntry> cache; - - public RCFileSyncCache() { - cache = Collections.synchronizedMap(new WeakHashMap<String, RCFileSyncEntry>()); - } - - public void put(FileSplit split, long endSync) { - Path path = split.getPath(); - long end = split.getStart() + split.getLength(); - String key = path.toString() + "+" + String.format("%d", end); - - RCFileSyncEntry entry = new RCFileSyncEntry(); - entry.end = end; - entry.endSync = endSync; - if (entry.endSync >= entry.end) { - cache.put(key, entry); - } - } - - public long get(FileSplit split) { - Path path = split.getPath(); - long start = split.getStart(); - String key = path.toString() + "+" + String.format("%d", start); - RCFileSyncEntry entry = cache.get(key); - if (entry != null) { - return entry.endSync; - } - return -1; - } - } - - public VectorizedRCFileRecordReader(Configuration conf, FileSplit split) - throws IOException { - - Path path = split.getPath(); - FileSystem fs = path.getFileSystem(conf); - this.in = new RCFile.Reader(fs, path, conf); - this.end = split.getStart() + split.getLength(); - this.conf = conf; - this.split = split; - - useCache = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVEUSERCFILESYNCCACHE); - - if (split.getStart() > in.getPosition()) { - long oldSync = useCache ? syncCache.get(split) : -1; - if (oldSync == -1) { - in.sync(split.getStart()); // sync to start - } else { - in.seek(oldSync); - } - } - - this.start = in.getPosition(); - - more = start < end; - try { - rbCtx = new VectorizedRowBatchCtx(); - rbCtx.init(conf, split); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public Class<?> getKeyClass() { - return LongWritable.class; - } - - public Class<?> getValueClass() { - return BytesRefArrayWritable.class; - } - - @Override - public NullWritable createKey() { - return NullWritable.get(); - } - - @Override - public VectorizedRowBatch createValue() { - VectorizedRowBatch result; - try { - result = rbCtx.createVectorizedRowBatch(); - } catch (HiveException e) { - throw new RuntimeException("Error creating a batch", e); - } - return result; - } - - public boolean nextBlock() throws IOException { - return in.nextBlock(); - } - - @Override - public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { - - // Reset column fields noNull values to true - VectorizedBatchUtil.setNoNullFields(value); - buffer.reset(); - value.selectedInUse = false; - for (int i = 0; i < value.numCols; i++) { - value.cols[i].isRepeating = false; - } - - int i = 0; - try { - - for (; i < VectorizedRowBatch.DEFAULT_SIZE; i++) { - more = next(keyCache); - if (more) { - // Check and update partition cols if necessary. Ideally this should be done - // in CreateValue() as the partition is constant per split. But since Hive uses - // CombineHiveRecordReader and as this does not call CreateValue() for - // each new RecordReader it creates, this check is required in next() - if (addPartitionCols) { - rbCtx.addPartitionColsToBatch(value); - addPartitionCols = false; - } - in.getCurrentRow(colsCache); - // Currently RCFile reader does not support reading vectorized - // data. Populating the batch by adding one row at a time. - rbCtx.addRowToBatch(i, (Writable) colsCache, value, buffer); - } else { - break; - } - } - } catch (Exception e) { - throw new RuntimeException("Error while getting next row", e); - } - value.size = i; - return more; - } - - protected boolean next(LongWritable key) throws IOException { - if (!more) { - return false; - } - - more = in.next(key); - - long lastSeenSyncPos = in.lastSeenSyncPos(); - - if (lastSeenSyncPos >= end) { - if (useCache) { - syncCache.put(split, lastSeenSyncPos); - } - more = false; - return more; - } - return more; - } - - /** - * Return the progress within the input split. - * - * @return 0.0 to 1.0 of the input byte range - */ - public float getProgress() throws IOException { - if (end == start) { - return 0.0f; - } else { - return Math.min(1.0f, (in.getPosition() - start) / (float) (end - start)); - } - } - - public long getPos() throws IOException { - return in.getPosition(); - } - - public KeyBuffer getKeyBuffer() { - return in.getCurrentKeyBufferObj(); - } - - protected void seek(long pos) throws IOException { - in.seek(pos); - } - - public void sync(long pos) throws IOException { - in.sync(pos); - } - - public void resetBuffer() { - in.resetBuffer(); - } - - public long getStart() { - return start; - } - - public void close() throws IOException { - in.close(); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java deleted file mode 100644 index aaf4eb4..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ConversionTreeReaderFactory.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.io.orc; - -import java.io.IOException; -import java.util.List; - -/** - * Factory for creating ORC tree readers. These tree readers can handle type promotions and type - * conversions. - */ -public class ConversionTreeReaderFactory extends TreeReaderFactory { - - // TODO: This is currently only a place holder for type conversions. - - public static TreeReader createTreeReader(int columnId, - List<OrcProto.Type> types, - boolean[] included, - boolean skipCorrupt - ) throws IOException { - return TreeReaderFactory.createTreeReader(columnId, types, included, skipCorrupt); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index e3e6893..d81a12d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.io.AcidInputFormat; @@ -53,7 +54,9 @@ import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; @@ -103,7 +106,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; */ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, InputFormatChecker, VectorizedInputFormatInterface, - AcidInputFormat<NullWritable, OrcStruct>, CombineHiveInputFormat.AvoidSplitCombination { + SelfDescribingInputFormatInterface, AcidInputFormat<NullWritable, OrcStruct>, + CombineHiveInputFormat.AvoidSplitCombination { static enum SplitStrategyKind{ HYBRID, @@ -222,7 +226,17 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, Configuration conf, long offset, long length ) throws IOException { + + /** + * Do we have schema on read in the configuration variables? + * + * NOTE: This code path is NOT used by ACID. OrcInputFormat.getRecordReader intercepts for + * ACID tables creates raw record merger, etc. + */ + TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ false); + Reader.Options options = new Reader.Options().range(offset, length); + options.schema(schema); boolean isOriginal = isOriginal(file); List<OrcProto.Type> types = file.getTypes(); options.include(genIncludedColumns(types, conf, isOriginal)); @@ -1167,7 +1181,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, if (vectorMode) { return (org.apache.hadoop.mapred.RecordReader) - new VectorizedOrcAcidRowReader(inner, conf, (FileSplit) inputSplit); + new VectorizedOrcAcidRowReader(inner, conf, + Utilities.getMapWork(conf).getVectorizedRowBatchCtx(), (FileSplit) inputSplit); } return new NullKeyRecordReader(inner, conf); } @@ -1218,10 +1233,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } } + // The schema type description does not include the ACID fields (i.e. it is the + // non-ACID original schema). + private static boolean SCHEMA_TYPES_IS_ORIGINAL = true; @Override public RowReader<OrcStruct> getReader(InputSplit inputSplit, - Options options) throws IOException { + Options options) + throws IOException { final OrcSplit split = (OrcSplit) inputSplit; final Path path = split.getPath(); Path root; @@ -1236,36 +1255,33 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } final Path[] deltas = AcidUtils.deserializeDeltas(root, split.getDeltas()); final Configuration conf = options.getConfiguration(); + + + /** + * Do we have schema on read in the configuration variables? + */ + TypeDescription schema = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ true); + if (schema == null) { + throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg()); + } + final Reader reader; final int bucket; - Reader.Options readOptions = new Reader.Options(); + Reader.Options readOptions = new Reader.Options().schema(schema); readOptions.range(split.getStart(), split.getLength()); + + // TODO: Convert genIncludedColumns and setSearchArgument to use TypeDescription. + final List<Type> schemaTypes = OrcUtils.getOrcTypes(schema); + readOptions.include(genIncludedColumns(schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL)); + setSearchArgument(readOptions, schemaTypes, conf, SCHEMA_TYPES_IS_ORIGINAL); + if (split.hasBase()) { bucket = AcidUtils.parseBaseBucketFilename(split.getPath(), conf) .getBucket(); reader = OrcFile.createReader(path, OrcFile.readerOptions(conf)); - final List<OrcProto.Type> types = reader.getTypes(); - readOptions.include(genIncludedColumns(types, conf, split.isOriginal())); - setSearchArgument(readOptions, types, conf, split.isOriginal()); } else { bucket = (int) split.getStart(); reader = null; - if(deltas != null && deltas.length > 0) { - Path bucketPath = AcidUtils.createBucketFile(deltas[0], bucket); - OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf); - FileSystem fs = readerOptions.getFilesystem(); - if(fs == null) { - fs = path.getFileSystem(options.getConfiguration()); - } - if(fs.exists(bucketPath)) { - /* w/o schema evolution (which ACID doesn't support yet) all delta - files have the same schema, so choosing the 1st one*/ - final List<OrcProto.Type> types = - OrcFile.createReader(bucketPath, readerOptions).getTypes(); - readOptions.include(genIncludedColumns(types, conf, split.isOriginal())); - setSearchArgument(readOptions, types, conf, split.isOriginal()); - } - } } String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY, Long.MAX_VALUE + ":"); @@ -1278,9 +1294,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, @Override public ObjectInspector getObjectInspector() { - return ((StructObjectInspector) records.getObjectInspector()) - .getAllStructFieldRefs().get(OrcRecordUpdater.ROW) - .getFieldObjectInspector(); + return OrcStruct.createObjectInspector(0, schemaTypes); } @Override @@ -1367,5 +1381,4 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, bucket, validTxnList, new Reader.Options(), deltaDirectory); } - } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index ab0c364..bad2a4c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -18,30 +18,25 @@ package org.apache.hadoop.hive.ql.io.orc; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.RecordIdentifier; -import org.apache.hadoop.hive.ql.metadata.VirtualColumn; -import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import java.io.IOException; -import java.util.ArrayDeque; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Deque; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -57,6 +52,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ private final Configuration conf; private final boolean collapse; private final RecordReader baseReader; + private final ObjectInspector objectInspector; private final long offset; private final long length; private final ValidTxnList validTxnList; @@ -443,6 +439,15 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ this.offset = options.getOffset(); this.length = options.getLength(); this.validTxnList = validTxnList; + + TypeDescription typeDescr = OrcUtils.getDesiredRowTypeDescr(conf, /* isAcid */ true); + if (typeDescr == null) { + throw new IOException(ErrorMsg.SCHEMA_REQUIRED_TO_READ_ACID_TABLES.getErrorCodedMsg()); + } + + objectInspector = OrcRecordUpdater.createEventSchema + (OrcStruct.createObjectInspector(0, OrcUtils.getOrcTypes(typeDescr))); + // modify the options to reflect the event instead of the base row Reader.Options eventOptions = createEventOptions(options); if (reader == null) { @@ -672,46 +677,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ @Override public ObjectInspector getObjectInspector() { - // Read the configuration parameters - String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS); - // NOTE: if "columns.types" is missing, all columns will be of String type - String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES); - - // Parse the configuration parameters - ArrayList<String> columnNames = new ArrayList<String>(); - Deque<Integer> virtualColumns = new ArrayDeque<Integer>(); - if (columnNameProperty != null && columnNameProperty.length() > 0) { - String[] colNames = columnNameProperty.split(","); - for (int i = 0; i < colNames.length; i++) { - if (VirtualColumn.VIRTUAL_COLUMN_NAMES.contains(colNames[i])) { - virtualColumns.addLast(i); - } else { - columnNames.add(colNames[i]); - } - } - } - if (columnTypeProperty == null) { - // Default type: all string - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < columnNames.size(); i++) { - if (i > 0) { - sb.append(":"); - } - sb.append("string"); - } - columnTypeProperty = sb.toString(); - } - - ArrayList<TypeInfo> fieldTypes = - TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); - while (virtualColumns.size() > 0) { - fieldTypes.remove(virtualColumns.removeLast()); - } - StructTypeInfo rowType = new StructTypeInfo(); - rowType.setAllStructFieldNames(columnNames); - rowType.setAllStructFieldTypeInfos(fieldTypes); - return OrcRecordUpdater.createEventSchema - (OrcStruct.createObjectInspector(rowType)); + return objectInspector; } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java index db2ca15..ad4a9e8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcUtils.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hive.ql.io.orc; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -30,6 +31,21 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.typeinfo.BaseCharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import com.google.common.collect.Lists; @@ -204,4 +220,550 @@ public class OrcUtils { return numWriters; } + /** + * Convert a Hive type property string that contains separated type names into a list of + * TypeDescription objects. + * @return the list of TypeDescription objects. + */ + public static ArrayList<TypeDescription> typeDescriptionsFromHiveTypeProperty( + String hiveTypeProperty) { + + // CONSDIER: We need a type name parser for TypeDescription. + + ArrayList<TypeInfo> typeInfoList = TypeInfoUtils.getTypeInfosFromTypeString(hiveTypeProperty); + ArrayList<TypeDescription> typeDescrList =new ArrayList<TypeDescription>(typeInfoList.size()); + for (TypeInfo typeInfo : typeInfoList) { + typeDescrList.add(convertTypeInfo(typeInfo)); + } + return typeDescrList; + } + + public static TypeDescription convertTypeInfo(TypeInfo info) { + switch (info.getCategory()) { + case PRIMITIVE: { + PrimitiveTypeInfo pinfo = (PrimitiveTypeInfo) info; + switch (pinfo.getPrimitiveCategory()) { + case BOOLEAN: + return TypeDescription.createBoolean(); + case BYTE: + return TypeDescription.createByte(); + case SHORT: + return TypeDescription.createShort(); + case INT: + return TypeDescription.createInt(); + case LONG: + return TypeDescription.createLong(); + case FLOAT: + return TypeDescription.createFloat(); + case DOUBLE: + return TypeDescription.createDouble(); + case STRING: + return TypeDescription.createString(); + case DATE: + return TypeDescription.createDate(); + case TIMESTAMP: + return TypeDescription.createTimestamp(); + case BINARY: + return TypeDescription.createBinary(); + case DECIMAL: { + DecimalTypeInfo dinfo = (DecimalTypeInfo) pinfo; + return TypeDescription.createDecimal() + .withScale(dinfo.getScale()) + .withPrecision(dinfo.getPrecision()); + } + case VARCHAR: { + BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo; + return TypeDescription.createVarchar() + .withMaxLength(cinfo.getLength()); + } + case CHAR: { + BaseCharTypeInfo cinfo = (BaseCharTypeInfo) pinfo; + return TypeDescription.createChar() + .withMaxLength(cinfo.getLength()); + } + default: + throw new IllegalArgumentException("ORC doesn't handle primitive" + + " category " + pinfo.getPrimitiveCategory()); + } + } + case LIST: { + ListTypeInfo linfo = (ListTypeInfo) info; + return TypeDescription.createList + (convertTypeInfo(linfo.getListElementTypeInfo())); + } + case MAP: { + MapTypeInfo minfo = (MapTypeInfo) info; + return TypeDescription.createMap + (convertTypeInfo(minfo.getMapKeyTypeInfo()), + convertTypeInfo(minfo.getMapValueTypeInfo())); + } + case UNION: { + UnionTypeInfo minfo = (UnionTypeInfo) info; + TypeDescription result = TypeDescription.createUnion(); + for (TypeInfo child: minfo.getAllUnionObjectTypeInfos()) { + result.addUnionChild(convertTypeInfo(child)); + } + return result; + } + case STRUCT: { + StructTypeInfo sinfo = (StructTypeInfo) info; + TypeDescription result = TypeDescription.createStruct(); + for(String fieldName: sinfo.getAllStructFieldNames()) { + result.addField(fieldName, + convertTypeInfo(sinfo.getStructFieldTypeInfo(fieldName))); + } + return result; + } + default: + throw new IllegalArgumentException("ORC doesn't handle " + + info.getCategory()); + } + } + + public static List<OrcProto.Type> getOrcTypes(TypeDescription typeDescr) { + List<OrcProto.Type> result = Lists.newArrayList(); + appendOrcTypes(result, typeDescr); + return result; + } + + private static void appendOrcTypes(List<OrcProto.Type> result, TypeDescription typeDescr) { + OrcProto.Type.Builder type = OrcProto.Type.newBuilder(); + List<TypeDescription> children = typeDescr.getChildren(); + switch (typeDescr.getCategory()) { + case BOOLEAN: + type.setKind(OrcProto.Type.Kind.BOOLEAN); + break; + case BYTE: + type.setKind(OrcProto.Type.Kind.BYTE); + break; + case SHORT: + type.setKind(OrcProto.Type.Kind.SHORT); + break; + case INT: + type.setKind(OrcProto.Type.Kind.INT); + break; + case LONG: + type.setKind(OrcProto.Type.Kind.LONG); + break; + case FLOAT: + type.setKind(OrcProto.Type.Kind.FLOAT); + break; + case DOUBLE: + type.setKind(OrcProto.Type.Kind.DOUBLE); + break; + case STRING: + type.setKind(OrcProto.Type.Kind.STRING); + break; + case CHAR: + type.setKind(OrcProto.Type.Kind.CHAR); + type.setMaximumLength(typeDescr.getMaxLength()); + break; + case VARCHAR: + type.setKind(Type.Kind.VARCHAR); + type.setMaximumLength(typeDescr.getMaxLength()); + break; + case BINARY: + type.setKind(OrcProto.Type.Kind.BINARY); + break; + case TIMESTAMP: + type.setKind(OrcProto.Type.Kind.TIMESTAMP); + break; + case DATE: + type.setKind(OrcProto.Type.Kind.DATE); + break; + case DECIMAL: + type.setKind(OrcProto.Type.Kind.DECIMAL); + type.setPrecision(typeDescr.getPrecision()); + type.setScale(typeDescr.getScale()); + break; + case LIST: + type.setKind(OrcProto.Type.Kind.LIST); + type.addSubtypes(children.get(0).getId()); + break; + case MAP: + type.setKind(OrcProto.Type.Kind.MAP); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); + } + break; + case STRUCT: + type.setKind(OrcProto.Type.Kind.STRUCT); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); + } + for(String field: typeDescr.getFieldNames()) { + type.addFieldNames(field); + } + break; + case UNION: + type.setKind(OrcProto.Type.Kind.UNION); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); + } + break; + default: + throw new IllegalArgumentException("Unknown category: " + + typeDescr.getCategory()); + } + result.add(type.build()); + if (children != null) { + for(TypeDescription child: children) { + appendOrcTypes(result, child); + } + } + } + + /** + * NOTE: This method ignores the subtype numbers in the TypeDescription rebuilds the subtype + * numbers based on the length of the result list being appended. + * + * @param result + * @param typeInfo + */ + public static void appendOrcTypesRebuildSubtypes(List<OrcProto.Type> result, + TypeDescription typeDescr) { + + int subtype = result.size(); + OrcProto.Type.Builder type = OrcProto.Type.newBuilder(); + boolean needsAdd = true; + List<TypeDescription> children = typeDescr.getChildren(); + switch (typeDescr.getCategory()) { + case BOOLEAN: + type.setKind(OrcProto.Type.Kind.BOOLEAN); + break; + case BYTE: + type.setKind(OrcProto.Type.Kind.BYTE); + break; + case SHORT: + type.setKind(OrcProto.Type.Kind.SHORT); + break; + case INT: + type.setKind(OrcProto.Type.Kind.INT); + break; + case LONG: + type.setKind(OrcProto.Type.Kind.LONG); + break; + case FLOAT: + type.setKind(OrcProto.Type.Kind.FLOAT); + break; + case DOUBLE: + type.setKind(OrcProto.Type.Kind.DOUBLE); + break; + case STRING: + type.setKind(OrcProto.Type.Kind.STRING); + break; + case CHAR: + type.setKind(OrcProto.Type.Kind.CHAR); + type.setMaximumLength(typeDescr.getMaxLength()); + break; + case VARCHAR: + type.setKind(Type.Kind.VARCHAR); + type.setMaximumLength(typeDescr.getMaxLength()); + break; + case BINARY: + type.setKind(OrcProto.Type.Kind.BINARY); + break; + case TIMESTAMP: + type.setKind(OrcProto.Type.Kind.TIMESTAMP); + break; + case DATE: + type.setKind(OrcProto.Type.Kind.DATE); + break; + case DECIMAL: + type.setKind(OrcProto.Type.Kind.DECIMAL); + type.setPrecision(typeDescr.getPrecision()); + type.setScale(typeDescr.getScale()); + break; + case LIST: + type.setKind(OrcProto.Type.Kind.LIST); + type.addSubtypes(++subtype); + result.add(type.build()); + needsAdd = false; + appendOrcTypesRebuildSubtypes(result, children.get(0)); + break; + case MAP: + { + // Make room for MAP type. + result.add(null); + + // Add MAP type pair in order to determine their subtype values. + appendOrcTypesRebuildSubtypes(result, children.get(0)); + int subtype2 = result.size(); + appendOrcTypesRebuildSubtypes(result, children.get(1)); + type.setKind(OrcProto.Type.Kind.MAP); + type.addSubtypes(subtype + 1); + type.addSubtypes(subtype2); + result.set(subtype, type.build()); + needsAdd = false; + } + break; + case STRUCT: + { + List<String> fieldNames = typeDescr.getFieldNames(); + + // Make room for STRUCT type. + result.add(null); + + List<Integer> fieldSubtypes = new ArrayList<Integer>(fieldNames.size()); + for(TypeDescription child: children) { + int fieldSubtype = result.size(); + fieldSubtypes.add(fieldSubtype); + appendOrcTypesRebuildSubtypes(result, child); + } + + type.setKind(OrcProto.Type.Kind.STRUCT); + + for (int i = 0 ; i < fieldNames.size(); i++) { + type.addSubtypes(fieldSubtypes.get(i)); + type.addFieldNames(fieldNames.get(i)); + } + result.set(subtype, type.build()); + needsAdd = false; + } + break; + case UNION: + { + // Make room for UNION type. + result.add(null); + + List<Integer> unionSubtypes = new ArrayList<Integer>(children.size()); + for(TypeDescription child: children) { + int unionSubtype = result.size(); + unionSubtypes.add(unionSubtype); + appendOrcTypesRebuildSubtypes(result, child); + } + + type.setKind(OrcProto.Type.Kind.UNION); + for (int i = 0 ; i < children.size(); i++) { + type.addSubtypes(unionSubtypes.get(i)); + } + result.set(subtype, type.build()); + needsAdd = false; + } + break; + default: + throw new IllegalArgumentException("Unknown category: " + typeDescr.getCategory()); + } + if (needsAdd) { + result.add(type.build()); + } + } + + /** + * NOTE: This method ignores the subtype numbers in the OrcProto.Type rebuilds the subtype + * numbers based on the length of the result list being appended. + * + * @param result + * @param typeInfo + */ + public static int appendOrcTypesRebuildSubtypes(List<OrcProto.Type> result, + List<OrcProto.Type> types, int columnId) { + + OrcProto.Type oldType = types.get(columnId++); + + int subtype = result.size(); + OrcProto.Type.Builder builder = OrcProto.Type.newBuilder(); + boolean needsAdd = true; + switch (oldType.getKind()) { + case BOOLEAN: + builder.setKind(OrcProto.Type.Kind.BOOLEAN); + break; + case BYTE: + builder.setKind(OrcProto.Type.Kind.BYTE); + break; + case SHORT: + builder.setKind(OrcProto.Type.Kind.SHORT); + break; + case INT: + builder.setKind(OrcProto.Type.Kind.INT); + break; + case LONG: + builder.setKind(OrcProto.Type.Kind.LONG); + break; + case FLOAT: + builder.setKind(OrcProto.Type.Kind.FLOAT); + break; + case DOUBLE: + builder.setKind(OrcProto.Type.Kind.DOUBLE); + break; + case STRING: + builder.setKind(OrcProto.Type.Kind.STRING); + break; + case CHAR: + builder.setKind(OrcProto.Type.Kind.CHAR); + builder.setMaximumLength(oldType.getMaximumLength()); + break; + case VARCHAR: + builder.setKind(Type.Kind.VARCHAR); + builder.setMaximumLength(oldType.getMaximumLength()); + break; + case BINARY: + builder.setKind(OrcProto.Type.Kind.BINARY); + break; + case TIMESTAMP: + builder.setKind(OrcProto.Type.Kind.TIMESTAMP); + break; + case DATE: + builder.setKind(OrcProto.Type.Kind.DATE); + break; + case DECIMAL: + builder.setKind(OrcProto.Type.Kind.DECIMAL); + builder.setPrecision(oldType.getPrecision()); + builder.setScale(oldType.getScale()); + break; + case LIST: + builder.setKind(OrcProto.Type.Kind.LIST); + builder.addSubtypes(++subtype); + result.add(builder.build()); + needsAdd = false; + columnId = appendOrcTypesRebuildSubtypes(result, types, columnId); + break; + case MAP: + { + // Make room for MAP type. + result.add(null); + + // Add MAP type pair in order to determine their subtype values. + columnId = appendOrcTypesRebuildSubtypes(result, types, columnId); + int subtype2 = result.size(); + columnId = appendOrcTypesRebuildSubtypes(result, types, columnId); + builder.setKind(OrcProto.Type.Kind.MAP); + builder.addSubtypes(subtype + 1); + builder.addSubtypes(subtype2); + result.set(subtype, builder.build()); + needsAdd = false; + } + break; + case STRUCT: + { + List<String> fieldNames = oldType.getFieldNamesList(); + + // Make room for STRUCT type. + result.add(null); + + List<Integer> fieldSubtypes = new ArrayList<Integer>(fieldNames.size()); + for(int i = 0 ; i < fieldNames.size(); i++) { + int fieldSubtype = result.size(); + fieldSubtypes.add(fieldSubtype); + columnId = appendOrcTypesRebuildSubtypes(result, types, columnId); + } + + builder.setKind(OrcProto.Type.Kind.STRUCT); + + for (int i = 0 ; i < fieldNames.size(); i++) { + builder.addSubtypes(fieldSubtypes.get(i)); + builder.addFieldNames(fieldNames.get(i)); + } + result.set(subtype, builder.build()); + needsAdd = false; + } + break; + case UNION: + { + int subtypeCount = oldType.getSubtypesCount(); + + // Make room for UNION type. + result.add(null); + + List<Integer> unionSubtypes = new ArrayList<Integer>(subtypeCount); + for(int i = 0 ; i < subtypeCount; i++) { + int unionSubtype = result.size(); + unionSubtypes.add(unionSubtype); + columnId = appendOrcTypesRebuildSubtypes(result, types, columnId); + } + + builder.setKind(OrcProto.Type.Kind.UNION); + for (int i = 0 ; i < subtypeCount; i++) { + builder.addSubtypes(unionSubtypes.get(i)); + } + result.set(subtype, builder.build()); + needsAdd = false; + } + break; + default: + throw new IllegalArgumentException("Unknown category: " + oldType.getKind()); + } + if (needsAdd) { + result.add(builder.build()); + } + return columnId; + } + + public static TypeDescription getDesiredRowTypeDescr(Configuration conf, boolean isAcid) { + + String columnNameProperty = null; + String columnTypeProperty = null; + + ArrayList<String> schemaEvolutionColumnNames = null; + ArrayList<TypeDescription> schemaEvolutionTypeDescrs = null; + + boolean haveSchemaEvolutionProperties = false; + if (isAcid || HiveConf.getBoolVar(conf, ConfVars.HIVE_SCHEMA_EVOLUTION)) { + + columnNameProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS); + columnTypeProperty = conf.get(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES); + + haveSchemaEvolutionProperties = + (columnNameProperty != null && columnTypeProperty != null); + + if (haveSchemaEvolutionProperties) { + schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(",")); + if (schemaEvolutionColumnNames.size() == 0) { + haveSchemaEvolutionProperties = false; + } else { + schemaEvolutionTypeDescrs = + OrcUtils.typeDescriptionsFromHiveTypeProperty(columnTypeProperty); + if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) { + haveSchemaEvolutionProperties = false; + } + } + } + } + + if (haveSchemaEvolutionProperties) { + LOG.info("Using schema evolution configuration variables " + + "schema.evolution.columns " + + schemaEvolutionColumnNames.toString() + + " / schema.evolution.columns.types " + + schemaEvolutionTypeDescrs.toString() + + " (isAcid " + + isAcid + + ")"); + + } else { + + // Try regular properties; + columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS); + columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES); + if (columnTypeProperty == null || columnNameProperty == null) { + return null; + } + + schemaEvolutionColumnNames = Lists.newArrayList(columnNameProperty.split(",")); + if (schemaEvolutionColumnNames.size() == 0) { + return null; + } + schemaEvolutionTypeDescrs = + OrcUtils.typeDescriptionsFromHiveTypeProperty(columnTypeProperty); + if (schemaEvolutionTypeDescrs.size() != schemaEvolutionColumnNames.size()) { + return null; + } + LOG.info("Using column configuration variables " + + "columns " + + schemaEvolutionColumnNames.toString() + + " / columns.types " + + schemaEvolutionTypeDescrs.toString() + + " (isAcid " + + isAcid + + ")"); + } + + // Desired schema does not include virtual columns or partition columns. + TypeDescription result = TypeDescription.createStruct(); + for (int i = 0; i < schemaEvolutionColumnNames.size(); i++) { + result.addField(schemaEvolutionColumnNames.get(i), schemaEvolutionTypeDescrs.get(i)); + } + + return result; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java index 8558592..6dbe461 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -152,6 +153,7 @@ public interface Reader { private boolean[] include; private long offset = 0; private long length = Long.MAX_VALUE; + private TypeDescription schema; private SearchArgument sarg = null; private String[] columnNames = null; @@ -178,6 +180,14 @@ public interface Reader { } /** + * Set the schema on read type description. + */ + public Options schema(TypeDescription schema) { + this.schema = schema; + return this; + } + + /** * Set search argument for predicate push down. * @param sarg the search argument * @param columnNames the column names for @@ -201,6 +211,10 @@ public interface Reader { return length; } + public TypeDescription getSchema() { + return schema; + } + public SearchArgument getSearchArgument() { return sarg; } @@ -222,6 +236,7 @@ public interface Reader { result.include = include; result.offset = offset; result.length = length; + result.schema = schema; result.sarg = sarg; result.columnNames = columnNames; return result; http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java deleted file mode 100644 index 8740ee6..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderFactory.java +++ /dev/null @@ -1,269 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.io.orc; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; -import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; - -import com.google.common.collect.Lists; - -/** - * Factory to create ORC tree readers. It also compares file schema with schema specified on read - * to see if type promotions are possible. - */ -public class RecordReaderFactory { - static final Log LOG = LogFactory.getLog(RecordReaderFactory.class); - private static final boolean isLogInfoEnabled = LOG.isInfoEnabled(); - - public static TreeReaderFactory.TreeReader createTreeReader(int colId, - Configuration conf, - List<OrcProto.Type> fileSchema, - boolean[] included, - boolean skipCorrupt) throws IOException { - final boolean isAcid = checkAcidSchema(fileSchema); - final List<OrcProto.Type> originalFileSchema; - if (isAcid) { - originalFileSchema = fileSchema.subList(fileSchema.get(0).getSubtypesCount(), - fileSchema.size()); - } else { - originalFileSchema = fileSchema; - } - final int numCols = originalFileSchema.get(0).getSubtypesCount(); - List<OrcProto.Type> schemaOnRead = getSchemaOnRead(numCols, conf); - List<OrcProto.Type> schemaUsed = getMatchingSchema(fileSchema, schemaOnRead); - if (schemaUsed == null) { - return TreeReaderFactory.createTreeReader(colId, fileSchema, included, skipCorrupt); - } else { - return ConversionTreeReaderFactory.createTreeReader(colId, schemaUsed, included, skipCorrupt); - } - } - - private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) { - if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) { - List<String> acidFields = OrcRecordUpdater.getAcidEventFields(); - List<String> rootFields = fileSchema.get(0).getFieldNamesList(); - if (acidFields.equals(rootFields)) { - return true; - } - } - return false; - } - - private static List<OrcProto.Type> getMatchingSchema(List<OrcProto.Type> fileSchema, - List<OrcProto.Type> schemaOnRead) { - if (schemaOnRead == null) { - if (isLogInfoEnabled) { - LOG.info("Schema is not specified on read. Using file schema."); - } - return null; - } - - if (fileSchema.size() != schemaOnRead.size()) { - if (isLogInfoEnabled) { - LOG.info("Schema on read column count does not match file schema's column count." + - " Falling back to using file schema."); - } - return null; - } else { - List<OrcProto.Type> result = Lists.newArrayList(fileSchema); - // check type promotion. ORC can only support type promotions for integer types - // short -> int -> bigint as same integer readers are used for the above types. - boolean canPromoteType = false; - for (int i = 0; i < fileSchema.size(); i++) { - OrcProto.Type fColType = fileSchema.get(i); - OrcProto.Type rColType = schemaOnRead.get(i); - if (!fColType.getKind().equals(rColType.getKind())) { - - if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) { - - if (rColType.getKind().equals(OrcProto.Type.Kind.INT) || - rColType.getKind().equals(OrcProto.Type.Kind.LONG)) { - // type promotion possible, converting SHORT to INT/LONG requested type - result.set(i, result.get(i).toBuilder().setKind(rColType.getKind()).build()); - canPromoteType = true; - } else { - canPromoteType = false; - } - - } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) { - - if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) { - // type promotion possible, converting INT to LONG requested type - result.set(i, result.get(i).toBuilder().setKind(rColType.getKind()).build()); - canPromoteType = true; - } else { - canPromoteType = false; - } - - } else { - canPromoteType = false; - } - } - } - - if (canPromoteType) { - if (isLogInfoEnabled) { - LOG.info("Integer type promotion happened in ORC record reader. Using promoted schema."); - } - return result; - } - } - - return null; - } - - private static List<OrcProto.Type> getSchemaOnRead(int numCols, Configuration conf) { - String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES); - final String columnNameProperty = conf.get(serdeConstants.LIST_COLUMNS); - if (columnTypeProperty == null || columnNameProperty == null) { - return null; - } - - ArrayList<String> columnNames = Lists.newArrayList(columnNameProperty.split(",")); - ArrayList<TypeInfo> fieldTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty); - StructTypeInfo structTypeInfo = new StructTypeInfo(); - // Column types from conf includes virtual and partition columns at the end. We consider only - // the actual columns in the file. - structTypeInfo.setAllStructFieldNames(Lists.newArrayList(columnNames.subList(0, numCols))); - structTypeInfo.setAllStructFieldTypeInfos(Lists.newArrayList(fieldTypes.subList(0, numCols))); - ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(structTypeInfo); - return getOrcTypes(oi); - } - - private static List<OrcProto.Type> getOrcTypes(ObjectInspector inspector) { - List<OrcProto.Type> result = Lists.newArrayList(); - getOrcTypesImpl(result, inspector); - return result; - } - - private static void getOrcTypesImpl(List<OrcProto.Type> result, ObjectInspector inspector) { - OrcProto.Type.Builder type = OrcProto.Type.newBuilder(); - switch (inspector.getCategory()) { - case PRIMITIVE: - switch (((PrimitiveObjectInspector) inspector).getPrimitiveCategory()) { - case BOOLEAN: - type.setKind(OrcProto.Type.Kind.BOOLEAN); - break; - case BYTE: - type.setKind(OrcProto.Type.Kind.BYTE); - break; - case SHORT: - type.setKind(OrcProto.Type.Kind.SHORT); - break; - case INT: - type.setKind(OrcProto.Type.Kind.INT); - break; - case LONG: - type.setKind(OrcProto.Type.Kind.LONG); - break; - case FLOAT: - type.setKind(OrcProto.Type.Kind.FLOAT); - break; - case DOUBLE: - type.setKind(OrcProto.Type.Kind.DOUBLE); - break; - case STRING: - type.setKind(OrcProto.Type.Kind.STRING); - break; - case CHAR: - // The char length needs to be written to file and should be available - // from the object inspector - CharTypeInfo charTypeInfo = (CharTypeInfo) ((PrimitiveObjectInspector) inspector) - .getTypeInfo(); - type.setKind(OrcProto.Type.Kind.CHAR); - type.setMaximumLength(charTypeInfo.getLength()); - break; - case VARCHAR: - // The varchar length needs to be written to file and should be available - // from the object inspector - VarcharTypeInfo typeInfo = (VarcharTypeInfo) ((PrimitiveObjectInspector) inspector) - .getTypeInfo(); - type.setKind(OrcProto.Type.Kind.VARCHAR); - type.setMaximumLength(typeInfo.getLength()); - break; - case BINARY: - type.setKind(OrcProto.Type.Kind.BINARY); - break; - case TIMESTAMP: - type.setKind(OrcProto.Type.Kind.TIMESTAMP); - break; - case DATE: - type.setKind(OrcProto.Type.Kind.DATE); - break; - case DECIMAL: - DecimalTypeInfo decTypeInfo = (DecimalTypeInfo) ((PrimitiveObjectInspector) inspector) - .getTypeInfo(); - type.setKind(OrcProto.Type.Kind.DECIMAL); - type.setPrecision(decTypeInfo.precision()); - type.setScale(decTypeInfo.scale()); - break; - default: - throw new IllegalArgumentException("Unknown primitive category: " + - ((PrimitiveObjectInspector) inspector).getPrimitiveCategory()); - } - result.add(type.build()); - break; - case LIST: - type.setKind(OrcProto.Type.Kind.LIST); - result.add(type.build()); - getOrcTypesImpl(result, ((ListObjectInspector) inspector).getListElementObjectInspector()); - break; - case MAP: - type.setKind(OrcProto.Type.Kind.MAP); - result.add(type.build()); - getOrcTypesImpl(result, ((MapObjectInspector) inspector).getMapKeyObjectInspector()); - getOrcTypesImpl(result, ((MapObjectInspector) inspector).getMapValueObjectInspector()); - break; - case STRUCT: - type.setKind(OrcProto.Type.Kind.STRUCT); - result.add(type.build()); - for (StructField field : ((StructObjectInspector) inspector).getAllStructFieldRefs()) { - getOrcTypesImpl(result, field.getFieldObjectInspector()); - } - break; - case UNION: - type.setKind(OrcProto.Type.Kind.UNION); - result.add(type.build()); - for (ObjectInspector oi : ((UnionObjectInspector) inspector).getObjectInspectors()) { - getOrcTypesImpl(result, oi); - } - break; - default: - throw new IllegalArgumentException("Unknown category: " + inspector.getCategory()); - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index c2d280d..24834a5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@ -47,6 +47,8 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool; import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReader; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; +import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; @@ -154,15 +156,27 @@ class RecordReaderImpl implements RecordReader { } protected RecordReaderImpl(List<StripeInformation> stripes, - FileSystem fileSystem, - Path path, - Reader.Options options, - List<OrcProto.Type> types, - CompressionCodec codec, - int bufferSize, - long strideRate, - Configuration conf - ) throws IOException { + FileSystem fileSystem, + Path path, + Reader.Options options, + List<OrcProto.Type> types, + CompressionCodec codec, + int bufferSize, + long strideRate, + Configuration conf + ) throws IOException { + + TreeReaderSchema treeReaderSchema; + if (options.getSchema() == null) { + treeReaderSchema = new TreeReaderSchema().fileTypes(types).schemaTypes(types); + } else { + + // Now that we are creating a record reader for a file, validate that the schema to read + // is compatible with the file schema. + // + List<Type> schemaTypes = OrcUtils.getOrcTypes(options.getSchema()); + treeReaderSchema = SchemaEvolution.validateAndCreate(types, schemaTypes); + } this.path = path; this.file = fileSystem.open(path); this.codec = codec; @@ -200,7 +214,7 @@ class RecordReaderImpl implements RecordReader { firstRow = skippedRows; totalRowCount = rows; boolean skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA); - reader = RecordReaderFactory.createTreeReader(0, conf, types, included, skipCorrupt); + reader = TreeReaderFactory.createTreeReader(0, treeReaderSchema, included, skipCorrupt); indexes = new OrcProto.RowIndex[types.size()]; bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()]; advanceToNextRow(reader, 0L, true); @@ -1085,6 +1099,7 @@ class RecordReaderImpl implements RecordReader { } else { result = (VectorizedRowBatch) previous; result.selectedInUse = false; + reader.setVectorColumnCount(result.getDataColumnCount()); reader.nextVector(result.cols, (int) batchSize); } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java new file mode 100644 index 0000000..9d00eb2 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SchemaEvolution.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.ql.io.orc.OrcProto.Type; +import org.apache.hadoop.hive.ql.io.orc.TreeReaderFactory.TreeReaderSchema; + +/** + * Take the file types and the (optional) configuration column names/types and see if there + * has been schema evolution. + */ +public class SchemaEvolution { + + private static final Log LOG = LogFactory.getLog(SchemaEvolution.class); + + public static TreeReaderSchema validateAndCreate(List<OrcProto.Type> fileTypes, + List<OrcProto.Type> schemaTypes) throws IOException { + + // For ACID, the row is the ROW field in the outer STRUCT. + final boolean isAcid = checkAcidSchema(fileTypes); + final List<OrcProto.Type> rowSchema; + int rowSubtype; + if (isAcid) { + rowSubtype = OrcRecordUpdater.ROW + 1; + rowSchema = fileTypes.subList(rowSubtype, fileTypes.size()); + } else { + rowSubtype = 0; + rowSchema = fileTypes; + } + + // Do checking on the overlap. Additional columns will be defaulted to NULL. + + int numFileColumns = rowSchema.get(0).getSubtypesCount(); + int numDesiredColumns = schemaTypes.get(0).getSubtypesCount(); + + int numReadColumns = Math.min(numFileColumns, numDesiredColumns); + + /** + * Check type promotion. + * + * Currently, we only support integer type promotions that can be done "implicitly". + * That is, we know that using a bigger integer tree reader on the original smaller integer + * column will "just work". + * + * In the future, other type promotions might require type conversion. + */ + // short -> int -> bigint as same integer readers are used for the above types. + + for (int i = 0; i < numReadColumns; i++) { + OrcProto.Type fColType = fileTypes.get(rowSubtype + i); + OrcProto.Type rColType = schemaTypes.get(i); + if (!fColType.getKind().equals(rColType.getKind())) { + + boolean ok = false; + if (fColType.getKind().equals(OrcProto.Type.Kind.SHORT)) { + + if (rColType.getKind().equals(OrcProto.Type.Kind.INT) || + rColType.getKind().equals(OrcProto.Type.Kind.LONG)) { + // type promotion possible, converting SHORT to INT/LONG requested type + ok = true; + } + } else if (fColType.getKind().equals(OrcProto.Type.Kind.INT)) { + + if (rColType.getKind().equals(OrcProto.Type.Kind.LONG)) { + // type promotion possible, converting INT to LONG requested type + ok = true; + } + } + + if (!ok) { + throw new IOException("ORC does not support type conversion from " + + fColType.getKind().name() + " to " + rColType.getKind().name()); + } + } + } + + List<Type> fullSchemaTypes; + + if (isAcid) { + fullSchemaTypes = new ArrayList<OrcProto.Type>(); + + // This copies the ACID struct type which is subtype = 0. + // It has field names "operation" through "row". + // And we copy the types for all fields EXCEPT ROW (which must be last!). + + for (int i = 0; i < rowSubtype; i++) { + fullSchemaTypes.add(fileTypes.get(i).toBuilder().build()); + } + + // Add the row struct type. + OrcUtils.appendOrcTypesRebuildSubtypes(fullSchemaTypes, schemaTypes, 0); + } else { + fullSchemaTypes = schemaTypes; + } + + int innerStructSubtype = rowSubtype; + + // LOG.info("Schema evolution: (fileTypes) " + fileTypes.toString() + + // " (schemaEvolutionTypes) " + schemaEvolutionTypes.toString()); + + return new TreeReaderSchema(). + fileTypes(fileTypes). + schemaTypes(fullSchemaTypes). + innerStructSubtype(innerStructSubtype); + } + + private static boolean checkAcidSchema(List<OrcProto.Type> fileSchema) { + if (fileSchema.get(0).getKind().equals(OrcProto.Type.Kind.STRUCT)) { + List<String> rootFields = fileSchema.get(0).getFieldNamesList(); + if (acidEventFieldNames.equals(rootFields)) { + return true; + } + } + return false; + } + + /** + * @param typeDescr + * @return ORC types for the ACID event based on the row's type description + */ + public static List<Type> createEventSchema(TypeDescription typeDescr) { + + List<Type> result = new ArrayList<Type>(); + + OrcProto.Type.Builder type = OrcProto.Type.newBuilder(); + type.setKind(OrcProto.Type.Kind.STRUCT); + type.addAllFieldNames(acidEventFieldNames); + for (int i = 0; i < acidEventFieldNames.size(); i++) { + type.addSubtypes(i + 1); + } + result.add(type.build()); + + // Automatically add all fields except the last (ROW). + for (int i = 0; i < acidEventOrcTypeKinds.size() - 1; i ++) { + type.clear(); + type.setKind(acidEventOrcTypeKinds.get(i)); + result.add(type.build()); + } + + OrcUtils.appendOrcTypesRebuildSubtypes(result, typeDescr); + return result; + } + + public static final List<String> acidEventFieldNames= new ArrayList<String>(); + static { + acidEventFieldNames.add("operation"); + acidEventFieldNames.add("originalTransaction"); + acidEventFieldNames.add("bucket"); + acidEventFieldNames.add("rowId"); + acidEventFieldNames.add("currentTransaction"); + acidEventFieldNames.add("row"); + } + public static final List<OrcProto.Type.Kind> acidEventOrcTypeKinds = + new ArrayList<OrcProto.Type.Kind>(); + static { + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT); + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG); + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.INT); + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG); + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.LONG); + acidEventOrcTypeKinds.add(OrcProto.Type.Kind.STRUCT); + } +} \ No newline at end of file
