HIVE-14007. Replace hive-orc module with ORC 1.3.1
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d7f71fb4 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d7f71fb4 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d7f71fb4 Branch: refs/heads/master Commit: d7f71fb4a6d39765a580568c6dd8b1ac2c9c708a Parents: 7a30285 Author: Owen O'Malley <[email protected]> Authored: Thu Jan 26 13:57:45 2017 -0800 Committer: Owen O'Malley <[email protected]> Committed: Fri Feb 3 08:37:33 2017 -0800 ---------------------------------------------------------------------- common/pom.xml | 9 +- .../org/apache/hadoop/hive/conf/HiveConf.java | 52 +- .../hive/ql/txn/compactor/TestCompactor.java | 2 +- jdbc/pom.xml | 3 +- llap-server/pom.xml | 4 + .../hive/llap/io/api/impl/LlapRecordReader.java | 9 +- .../llap/io/decode/EncodedDataConsumer.java | 6 - .../io/decode/GenericColumnVectorProducer.java | 85 +- .../llap/io/decode/OrcColumnVectorProducer.java | 3 +- .../llap/io/decode/OrcEncodedDataConsumer.java | 30 +- .../hive/llap/io/decode/ReadPipeline.java | 6 +- .../llap/io/encoded/OrcEncodedDataReader.java | 29 +- .../llap/io/encoded/SerDeEncodedDataReader.java | 142 +- .../io/encoded/VertorDeserializeOrcWriter.java | 6 +- .../llap/io/metadata/OrcStripeMetadata.java | 28 +- .../TestIncrementalObjectSizeEstimator.java | 28 +- metastore/pom.xml | 38 +- orc/pom.xml | 184 - .../protobuf-java/org/apache/orc/OrcProto.java | 20179 ----------------- .../org/apache/orc/BinaryColumnStatistics.java | 27 - orc/src/java/org/apache/orc/BloomFilterIO.java | 43 - .../org/apache/orc/BooleanColumnStatistics.java | 29 - .../java/org/apache/orc/ColumnStatistics.java | 36 - .../java/org/apache/orc/CompressionCodec.java | 69 - .../java/org/apache/orc/CompressionKind.java | 27 - orc/src/java/org/apache/orc/DataReader.java | 76 - .../org/apache/orc/DateColumnStatistics.java | 39 - .../org/apache/orc/DecimalColumnStatistics.java | 46 - .../org/apache/orc/DoubleColumnStatistics.java | 46 - .../org/apache/orc/FileFormatException.java | 30 - orc/src/java/org/apache/orc/FileMetadata.java | 64 - .../org/apache/orc/IntegerColumnStatistics.java | 52 - orc/src/java/org/apache/orc/OrcConf.java | 193 - orc/src/java/org/apache/orc/OrcFile.java | 566 - orc/src/java/org/apache/orc/OrcUtils.java | 624 - orc/src/java/org/apache/orc/Reader.java | 375 - orc/src/java/org/apache/orc/RecordReader.java | 64 - .../org/apache/orc/StringColumnStatistics.java | 43 - .../java/org/apache/orc/StripeInformation.java | 59 - .../java/org/apache/orc/StripeStatistics.java | 44 - .../apache/orc/TimestampColumnStatistics.java | 38 - .../java/org/apache/orc/TypeDescription.java | 870 - orc/src/java/org/apache/orc/Writer.java | 114 - orc/src/java/org/apache/orc/impl/AcidStats.java | 60 - .../org/apache/orc/impl/BitFieldReader.java | 217 - .../org/apache/orc/impl/BitFieldWriter.java | 73 - .../java/org/apache/orc/impl/BufferChunk.java | 85 - .../apache/orc/impl/ColumnStatisticsImpl.java | 1101 - .../orc/impl/ConvertTreeReaderFactory.java | 2930 --- .../apache/orc/impl/DataReaderProperties.java | 124 - .../orc/impl/DirectDecompressionCodec.java | 28 - .../org/apache/orc/impl/DynamicByteArray.java | 303 - .../org/apache/orc/impl/DynamicIntArray.java | 142 - .../java/org/apache/orc/impl/HadoopShims.java | 143 - .../org/apache/orc/impl/HadoopShimsCurrent.java | 92 - .../org/apache/orc/impl/HadoopShims_2_2.java | 101 - orc/src/java/org/apache/orc/impl/InStream.java | 498 - .../java/org/apache/orc/impl/IntegerReader.java | 82 - .../java/org/apache/orc/impl/IntegerWriter.java | 47 - .../java/org/apache/orc/impl/MemoryManager.java | 214 - .../java/org/apache/orc/impl/OrcAcidUtils.java | 88 - orc/src/java/org/apache/orc/impl/OrcIndex.java | 43 - orc/src/java/org/apache/orc/impl/OrcTail.java | 140 - orc/src/java/org/apache/orc/impl/OutStream.java | 289 - .../org/apache/orc/impl/PhysicalFsWriter.java | 529 - .../org/apache/orc/impl/PhysicalWriter.java | 122 - .../org/apache/orc/impl/PositionProvider.java | 26 - .../org/apache/orc/impl/PositionRecorder.java | 25 - .../apache/orc/impl/PositionedOutputStream.java | 39 - .../java/org/apache/orc/impl/ReaderImpl.java | 764 - .../org/apache/orc/impl/RecordReaderImpl.java | 1230 - .../org/apache/orc/impl/RecordReaderUtils.java | 578 - .../java/org/apache/orc/impl/RedBlackTree.java | 311 - .../apache/orc/impl/RunLengthByteReader.java | 174 - .../apache/orc/impl/RunLengthByteWriter.java | 106 - .../apache/orc/impl/RunLengthIntegerReader.java | 173 - .../orc/impl/RunLengthIntegerReaderV2.java | 406 - .../apache/orc/impl/RunLengthIntegerWriter.java | 143 - .../orc/impl/RunLengthIntegerWriterV2.java | 831 - .../org/apache/orc/impl/SchemaEvolution.java | 399 - .../org/apache/orc/impl/SerializationUtils.java | 1311 -- .../orc/impl/SettableUncompressedStream.java | 44 - .../java/org/apache/orc/impl/SnappyCodec.java | 108 - .../java/org/apache/orc/impl/StreamName.java | 97 - .../org/apache/orc/impl/StringRedBlackTree.java | 210 - .../org/apache/orc/impl/TreeReaderFactory.java | 2162 -- .../java/org/apache/orc/impl/WriterImpl.java | 2444 -- .../java/org/apache/orc/impl/ZeroCopyShims.java | 89 - orc/src/java/org/apache/orc/impl/ZlibCodec.java | 169 - orc/src/java/org/apache/orc/tools/FileDump.java | 946 - .../java/org/apache/orc/tools/JsonFileDump.java | 412 - orc/src/protobuf/orc_proto.proto | 230 - .../org/apache/orc/TestColumnStatistics.java | 365 - .../org/apache/orc/TestNewIntegerEncoding.java | 1373 -- .../org/apache/orc/TestOrcNullOptimization.java | 415 - .../test/org/apache/orc/TestOrcTimezone1.java | 189 - .../test/org/apache/orc/TestOrcTimezone2.java | 143 - .../test/org/apache/orc/TestOrcTimezone3.java | 126 - .../org/apache/orc/TestStringDictionary.java | 290 - .../org/apache/orc/TestTypeDescription.java | 91 - .../org/apache/orc/TestUnrolledBitPack.java | 114 - .../test/org/apache/orc/TestVectorOrcFile.java | 2782 --- .../org/apache/orc/impl/TestBitFieldReader.java | 145 - .../test/org/apache/orc/impl/TestBitPack.java | 279 - .../orc/impl/TestColumnStatisticsImpl.java | 64 - .../orc/impl/TestDataReaderProperties.java | 86 - .../org/apache/orc/impl/TestDynamicArray.java | 90 - .../test/org/apache/orc/impl/TestInStream.java | 314 - .../orc/impl/TestIntegerCompressionReader.java | 130 - .../org/apache/orc/impl/TestMemoryManager.java | 133 - .../org/apache/orc/impl/TestOrcWideTable.java | 64 - .../test/org/apache/orc/impl/TestOutStream.java | 43 - orc/src/test/org/apache/orc/impl/TestRLEv2.java | 307 - .../org/apache/orc/impl/TestReaderImpl.java | 152 - .../apache/orc/impl/TestRecordReaderImpl.java | 1691 -- .../orc/impl/TestRunLengthByteReader.java | 143 - .../orc/impl/TestRunLengthIntegerReader.java | 125 - .../apache/orc/impl/TestSchemaEvolution.java | 469 - .../apache/orc/impl/TestSerializationUtils.java | 201 - .../org/apache/orc/impl/TestStreamName.java | 49 - .../apache/orc/impl/TestStringRedBlackTree.java | 234 - orc/src/test/org/apache/orc/impl/TestZlib.java | 56 - .../test/org/apache/orc/tools/TestFileDump.java | 486 - .../org/apache/orc/tools/TestJsonFileDump.java | 150 - orc/src/test/resources/orc-file-11-format.orc | Bin 373336 -> 0 bytes .../resources/orc-file-dump-bloomfilter.out | 179 - .../resources/orc-file-dump-bloomfilter2.out | 179 - .../orc-file-dump-dictionary-threshold.out | 190 - orc/src/test/resources/orc-file-dump.json | 1355 -- orc/src/test/resources/orc-file-dump.out | 195 - orc/src/test/resources/orc-file-has-null.out | 112 - packaging/pom.xml | 5 - pom.xml | 22 +- ql/pom.xml | 15 +- .../apache/hadoop/hive/ql/io/orc/OrcFile.java | 6 + .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 5 +- .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 58 +- .../hadoop/hive/ql/io/orc/WriterImpl.java | 16 +- .../orc/encoded/EncodedTreeReaderFactory.java | 211 +- .../hadoop/hive/ql/processors/SetProcessor.java | 15 +- .../hive/ql/io/orc/TestInputOutputFormat.java | 82 +- .../hadoop/hive/ql/io/orc/TestOrcFile.java | 5 +- .../TestVectorizedOrcAcidRowBatchReader.java | 2 +- .../queries/clientpositive/orc_remove_cols.q | 2 +- .../clientpositive/orc_schema_evolution.q | 12 +- .../clientpositive/llap/llap_nullscan.q.out | 4 +- .../clientpositive/llap/orc_analyze.q.out | 46 +- .../clientpositive/llap/orc_llap_counters.q.out | 8 +- .../llap/orc_llap_counters1.q.out | 4 +- .../clientpositive/llap/orc_merge10.q.out | 72 +- .../clientpositive/llap/orc_merge11.q.out | 6 +- .../clientpositive/llap/orc_merge12.q.out | 2 +- .../clientpositive/llap/orc_ppd_basic.q.out | 8 +- .../llap/orc_ppd_schema_evol_3a.q.out | 76 +- .../clientpositive/materialized_view_drop.q.out | 6 +- .../results/clientpositive/orc_file_dump.q.out | 264 +- .../results/clientpositive/orc_merge10.q.out | 16 +- .../results/clientpositive/orc_merge11.q.out | 6 +- .../results/clientpositive/orc_merge12.q.out | 2 +- .../clientpositive/orc_remove_cols.q.out | 4 +- .../clientpositive/orc_schema_evolution.q.out | 24 +- .../clientpositive/tez/orc_merge12.q.out | 2 +- .../hive/ql/io/sarg/SearchArgumentImpl.java | 8 + .../ptest2/conf/deployed/master-mr2.properties | 3 - .../resources/test-configuration2.properties | 3 - 165 files changed, 841 insertions(+), 58983 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/common/pom.xml ---------------------------------------------------------------------- diff --git a/common/pom.xml b/common/pom.xml index e54ec98..fd948f8 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -43,11 +43,6 @@ <groupId>org.apache.hive</groupId> <artifactId>hive-storage-api</artifactId> </dependency> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-orc</artifactId> - <version>${project.version}</version> - </dependency> <!-- inter-project --> <dependency> <groupId>commons-cli</groupId> @@ -65,6 +60,10 @@ <version>${commons-lang3.version}</version> </dependency> <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + </dependency> + <dependency> <groupId>org.eclipse.jetty.aggregate</groupId> <artifactId>jetty-all</artifactId> <version>${jetty.version}</version> http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 53b9b0c..1949095 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1318,53 +1318,9 @@ public class HiveConf extends Configuration { HIVE_INT_TIMESTAMP_CONVERSION_IN_SECONDS("hive.int.timestamp.conversion.in.seconds", false, "Boolean/tinyint/smallint/int/bigint value is interpreted as milliseconds during the timestamp conversion.\n" + "Set this flag to true to interpret the value as seconds to be consistent with float/double." ), - HIVE_ORC_FILE_MEMORY_POOL("hive.exec.orc.memory.pool", 0.5f, - "Maximum fraction of heap that can be used by ORC file writers"), - HIVE_ORC_WRITE_FORMAT("hive.exec.orc.write.format", null, - "Define the version of the file to write. Possible values are 0.11 and 0.12.\n" + - "If this parameter is not defined, ORC will use the run length encoding (RLE)\n" + - "introduced in Hive 0.12. Any value other than 0.11 results in the 0.12 encoding."), - HIVE_ORC_DEFAULT_STRIPE_SIZE("hive.exec.orc.default.stripe.size", - 64L * 1024 * 1024, - "Define the default ORC stripe size, in bytes."), - HIVE_ORC_DEFAULT_BLOCK_SIZE("hive.exec.orc.default.block.size", 256L * 1024 * 1024, - "Define the default file system block size for ORC files."), - - HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD("hive.exec.orc.dictionary.key.size.threshold", 0.8f, - "If the number of keys in a dictionary is greater than this fraction of the total number of\n" + - "non-null rows, turn off dictionary encoding. Use 1 to always use dictionary encoding."), - HIVE_ORC_DEFAULT_ROW_INDEX_STRIDE("hive.exec.orc.default.row.index.stride", 10000, - "Define the default ORC index stride in number of rows. (Stride is the number of rows\n" + - "an index entry represents.)"), - HIVE_ORC_ROW_INDEX_STRIDE_DICTIONARY_CHECK("hive.orc.row.index.stride.dictionary.check", true, - "If enabled dictionary check will happen after first row index stride (default 10000 rows)\n" + - "else dictionary check will happen before writing first stripe. In both cases, the decision\n" + - "to use dictionary or not will be retained thereafter."), - HIVE_ORC_DEFAULT_BUFFER_SIZE("hive.exec.orc.default.buffer.size", 256 * 1024, - "Define the default ORC buffer size, in bytes."), + HIVE_ORC_BASE_DELTA_RATIO("hive.exec.orc.base.delta.ratio", 8, "The ratio of base writer and\n" + "delta writer in terms of STRIPE_SIZE and BUFFER_SIZE."), - HIVE_ORC_DEFAULT_BLOCK_PADDING("hive.exec.orc.default.block.padding", true, - "Define the default block padding, which pads stripes to the HDFS block boundaries."), - HIVE_ORC_BLOCK_PADDING_TOLERANCE("hive.exec.orc.block.padding.tolerance", 0.05f, - "Define the tolerance for block padding as a decimal fraction of stripe size (for\n" + - "example, the default value 0.05 is 5% of the stripe size). For the defaults of 64Mb\n" + - "ORC stripe and 256Mb HDFS blocks, the default block padding tolerance of 5% will\n" + - "reserve a maximum of 3.2Mb for padding within the 256Mb block. In that case, if the\n" + - "available size within the block is more than 3.2Mb, a new smaller stripe will be\n" + - "inserted to fit within that space. This will make sure that no stripe written will\n" + - "cross block boundaries and cause remote reads within a node local task."), - HIVE_ORC_DEFAULT_COMPRESS("hive.exec.orc.default.compress", "ZLIB", "Define the default compression codec for ORC file"), - - HIVE_ORC_ENCODING_STRATEGY("hive.exec.orc.encoding.strategy", "SPEED", new StringSet("SPEED", "COMPRESSION"), - "Define the encoding strategy to use while writing data. Changing this will\n" + - "only affect the light weight encoding for integers. This flag will not\n" + - "change the compression level of higher level compression codec (like ZLIB)."), - - HIVE_ORC_COMPRESSION_STRATEGY("hive.exec.orc.compression.strategy", "SPEED", new StringSet("SPEED", "COMPRESSION"), - "Define the compression strategy to use while writing data. \n" + - "This changes the compression level of higher level compression codec (like ZLIB)."), - HIVE_ORC_SPLIT_STRATEGY("hive.exec.orc.split.strategy", "HYBRID", new StringSet("HYBRID", "BI", "ETL"), "This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation" + " as opposed to query execution (split generation does not read or cache file footers)." + @@ -1398,12 +1354,6 @@ public class HiveConf extends Configuration { "references for the cached object. Setting this to true can help avoid out of memory\n" + "issues under memory pressure (in some cases) at the cost of slight unpredictability in\n" + "overall query performance."), - HIVE_ORC_SKIP_CORRUPT_DATA("hive.exec.orc.skip.corrupt.data", false, - "If ORC reader encounters corrupt data, this value will be used to determine\n" + - "whether to skip the corrupt data or throw exception. The default behavior is to throw exception."), - - HIVE_ORC_ZEROCOPY("hive.exec.orc.zerocopy", false, - "Use zerocopy reads with ORC. (This requires Hadoop 2.3 or later.)"), HIVE_LAZYSIMPLE_EXTENDED_BOOLEAN_LITERAL("hive.lazysimple.extended_boolean_literal", false, "LazySimpleSerde uses this property to determine if it treats 'T', 't', 'F', 'f',\n" + http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 0587e80..66ed8ca 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -1165,7 +1165,7 @@ public class TestCompactor { "'transactional'='true'," + "'compactor.mapreduce.map.memory.mb'='2048'," + // 2048 MB memory for compaction map job "'compactorthreshold.hive.compactor.delta.num.threshold'='4'," + // minor compaction if more than 4 delta dirs - "'compactorthreshold.hive.compactor.delta.pct.threshold'='0.5'" + // major compaction if more than 50% + "'compactorthreshold.hive.compactor.delta.pct.threshold'='0.49'" + // major compaction if more than 49% ")", driver); // Insert 5 rows to both tables http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/jdbc/pom.xml b/jdbc/pom.xml index 522eb8d..dba1ae1 100644 --- a/jdbc/pom.xml +++ b/jdbc/pom.xml @@ -196,10 +196,11 @@ <exclude>com.sun.jersey.contribs:*</exclude> <exclude>org.eclipse.jetty.aggregate:*</exclude> <exclude>org.tukaani:*</exclude> - <exclude>org.iq80.snappy:*</exclude> + <exclude>io.airlift:*</exclude> <exclude>org.apache.velocity:*</exclude> <exclude>net.sf.jpam:*</exclude> <exclude>org.apache.avro:*</exclude> + <exclude>org.apache.orc:*</exclude> <exclude>net.sf.opencsv:*</exclude> <exclude>org.antlr:*</exclude> <exclude>org.slf4j:slf4j-log4j12</exclude> http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/llap-server/pom.xml ---------------------------------------------------------------------- diff --git a/llap-server/pom.xml b/llap-server/pom.xml index 379ffd6..fc392fb 100644 --- a/llap-server/pom.xml +++ b/llap-server/pom.xml @@ -137,6 +137,10 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + </dependency> + <dependency> <groupId>org.apache.tez</groupId> <artifactId>tez-runtime-internals</artifactId> <version>${tez.version}</version> http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index 9ef9ca4..9b1a905 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -91,7 +91,7 @@ class LlapRecordReader private final ExecutorService executor; private final int columnCount; - private TypeDescription fileSchema; + private SchemaEvolution evolution; public LlapRecordReader(JobConf job, FileSplit split, List<Integer> includedCols, String hostName, ColumnVectorProducer cvp, ExecutorService executor, @@ -147,7 +147,7 @@ class LlapRecordReader feedback = rp = cvp.createReadPipeline(this, split, columnIds, sarg, columnNames, counters, schema, sourceInputFormat, sourceSerDe, reporter, job, mapWork.getPathToPartitionInfo()); - fileSchema = rp.getFileSchema(); + evolution = rp.getSchemaEvolution(); includedColumns = rp.getIncludedColumns(); } @@ -168,10 +168,9 @@ class LlapRecordReader } private boolean checkOrcSchemaEvolution() { - SchemaEvolution schemaEvolution = new SchemaEvolution( - fileSchema, rp.getReaderSchema(), includedColumns); for (int i = 0; i < columnCount; ++i) { - if (!schemaEvolution.isPPDSafeConversion(columnIds.get(i))) { + int colId = columnIds == null ? i : columnIds.get(i); + if (!evolution.isPPDSafeConversion(colId)) { LlapIoImpl.LOG.warn("Unsupported schema evolution! Disabling Llap IO for {}", split); return false; } http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java index 312f008..3cafad1 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/EncodedDataConsumer.java @@ -133,10 +133,4 @@ public abstract class EncodedDataConsumer<BatchKey, BatchType extends EncodedCol // We are just a relay; send unpause to encoded data producer. upstreamFeedback.unpause(); } - - @Override - public TypeDescription getFileSchema() { - // TODO: the ORC-specific method should be removed from the interface instead. - throw new UnsupportedOperationException(); - } } http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java index 61d385e..0d9779f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/GenericColumnVectorProducer.java @@ -174,10 +174,93 @@ public class GenericColumnVectorProducer implements ColumnVectorProducer { addTypesFromSchema(schema); } + // Copied here until a utility version of this released in ORC. + public static List<TypeDescription> setTypeBuilderFromSchema( + OrcProto.Type.Builder type, TypeDescription schema) { + List<TypeDescription> children = schema.getChildren(); + switch (schema.getCategory()) { + case BOOLEAN: + type.setKind(OrcProto.Type.Kind.BOOLEAN); + break; + case BYTE: + type.setKind(OrcProto.Type.Kind.BYTE); + break; + case SHORT: + type.setKind(OrcProto.Type.Kind.SHORT); + break; + case INT: + type.setKind(OrcProto.Type.Kind.INT); + break; + case LONG: + type.setKind(OrcProto.Type.Kind.LONG); + break; + case FLOAT: + type.setKind(OrcProto.Type.Kind.FLOAT); + break; + case DOUBLE: + type.setKind(OrcProto.Type.Kind.DOUBLE); + break; + case STRING: + type.setKind(OrcProto.Type.Kind.STRING); + break; + case CHAR: + type.setKind(OrcProto.Type.Kind.CHAR); + type.setMaximumLength(schema.getMaxLength()); + break; + case VARCHAR: + type.setKind(OrcProto.Type.Kind.VARCHAR); + type.setMaximumLength(schema.getMaxLength()); + break; + case BINARY: + type.setKind(OrcProto.Type.Kind.BINARY); + break; + case TIMESTAMP: + type.setKind(OrcProto.Type.Kind.TIMESTAMP); + break; + case DATE: + type.setKind(OrcProto.Type.Kind.DATE); + break; + case DECIMAL: + type.setKind(OrcProto.Type.Kind.DECIMAL); + type.setPrecision(schema.getPrecision()); + type.setScale(schema.getScale()); + break; + case LIST: + type.setKind(OrcProto.Type.Kind.LIST); + type.addSubtypes(children.get(0).getId()); + break; + case MAP: + type.setKind(OrcProto.Type.Kind.MAP); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); + } + break; + case STRUCT: + type.setKind(OrcProto.Type.Kind.STRUCT); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); + } + for(String field: schema.getFieldNames()) { + type.addFieldNames(field); + } + break; + case UNION: + type.setKind(OrcProto.Type.Kind.UNION); + for(TypeDescription t: children) { + type.addSubtypes(t.getId()); + } + break; + default: + throw new IllegalArgumentException("Unknown category: " + + schema.getCategory()); + } + return children; + } + private void addTypesFromSchema(TypeDescription schema) { // The same thing that WriterImpl does when writing the footer, but w/o the footer. OrcProto.Type.Builder type = OrcProto.Type.newBuilder(); - List<TypeDescription> children = OrcUtils.setTypeBuilderFromSchema(type, schema); + List<TypeDescription> children = setTypeBuilderFromSchema(type, schema); orcTypes.add(type.build()); if (children == null) return; for(TypeDescription child : children) { http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java index 7c89e82..ac031aa 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcColumnVectorProducer.java @@ -44,6 +44,7 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; import org.apache.orc.TypeDescription; +import org.apache.orc.OrcConf; public class OrcColumnVectorProducer implements ColumnVectorProducer { @@ -64,7 +65,7 @@ public class OrcColumnVectorProducer implements ColumnVectorProducer { this.lowLevelCache = lowLevelCache; this.bufferManager = bufferManager; this.conf = conf; - this._skipCorrupt = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_SKIP_CORRUPT_DATA); + this._skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf); this.cacheMetrics = cacheMetrics; this.ioMetrics = ioMetrics; } http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index d0e70d1..4295c1c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; import org.apache.hadoop.hive.llap.io.metadata.ConsumerFileMetadata; import org.apache.hadoop.hive.llap.io.metadata.ConsumerStripeMetadata; +import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; import org.apache.hadoop.hive.llap.metrics.LlapDaemonIOMetrics; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; @@ -47,13 +48,13 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedTreeReaderFactory.Settabl import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; -import org.apache.orc.OrcUtils; import org.apache.orc.TypeDescription; -import org.apache.orc.impl.PhysicalFsWriter; +import org.apache.orc.impl.SchemaEvolution; import org.apache.orc.impl.TreeReaderFactory; import org.apache.orc.impl.TreeReaderFactory.StructTreeReader; import org.apache.orc.impl.TreeReaderFactory.TreeReader; import org.apache.orc.OrcProto; +import org.apache.orc.impl.WriterImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +72,7 @@ public class OrcEncodedDataConsumer private final boolean skipCorrupt; // TODO: get rid of this private final QueryFragmentCounters counters; private boolean[] includedColumns; - private TypeDescription readerSchema; + private SchemaEvolution evolution; public OrcEncodedDataConsumer( Consumer<ColumnVectorBatch> consumer, int colCount, boolean skipCorrupt, @@ -86,7 +87,7 @@ public class OrcEncodedDataConsumer assert fileMetadata == null; fileMetadata = f; stripes = new ArrayList<>(f.getStripeCount()); - codec = PhysicalFsWriter.createCodec(f.getCompressionKind()); + codec = WriterImpl.createCodec(fileMetadata.getCompressionKind()); } public void setStripeMetadata(ConsumerStripeMetadata m) { @@ -124,9 +125,13 @@ public class OrcEncodedDataConsumer if (columnReaders == null || !sameStripe) { int[] columnMapping = new int[schema.getChildren().size()]; + TreeReaderFactory.Context context = + new TreeReaderFactory.ReaderContext() + .setSchemaEvolution(evolution) + .writerTimeZone(stripeMetadata.getWriterTimezone()) + .skipCorrupt(skipCorrupt); StructTreeReader treeReader = EncodedTreeReaderFactory.createRootTreeReader( - schema, stripeMetadata.getEncodings(), batch, codec, skipCorrupt, - stripeMetadata.getWriterTimezone(), columnMapping); + schema, stripeMetadata.getEncodings(), batch, codec, context, columnMapping); this.columnReaders = treeReader.getChildReaders(); this.columnMapping = Arrays.copyOf(columnMapping, columnReaders.length); positionInStreams(columnReaders, batch, stripeMetadata); @@ -295,11 +300,6 @@ public class OrcEncodedDataConsumer } @Override - public TypeDescription getFileSchema() { - return OrcUtils.convertTypeFromProtobuf(fileMetadata.getTypes(), 0); - } - - @Override public boolean[] getIncludedColumns() { return includedColumns; } @@ -308,11 +308,11 @@ public class OrcEncodedDataConsumer this.includedColumns = includedColumns; } - public void setReaderSchema(TypeDescription readerSchema) { - this.readerSchema = readerSchema; + public void setSchemaEvolution(SchemaEvolution evolution) { + this.evolution = evolution; } - public TypeDescription getReaderSchema() { - return readerSchema; + public SchemaEvolution getSchemaEvolution() { + return evolution; } } http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java index 36f6c9c..09a350b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ReadPipeline.java @@ -22,10 +22,10 @@ import java.util.concurrent.Callable; import org.apache.hadoop.hive.llap.ConsumerFeedback; import org.apache.hadoop.hive.llap.io.api.impl.ColumnVectorBatch; import org.apache.orc.TypeDescription; +import org.apache.orc.impl.SchemaEvolution; public interface ReadPipeline extends ConsumerFeedback<ColumnVectorBatch> { public Callable<Void> getReadCallable(); - TypeDescription getFileSchema(); // TODO: this is ORC-specific and should be removed - TypeDescription getReaderSchema(); + SchemaEvolution getSchemaEvolution(); boolean[] getIncludedColumns(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index 7944345..fb00419 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -202,10 +202,11 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> readerSchema = fileMetadata.getSchema(); } globalIncludes = OrcInputFormat.genIncludedColumns(readerSchema, includedColumnIds); - evolution = new SchemaEvolution(fileMetadata.getSchema(), readerSchema, globalIncludes); + Reader.Options options = new Reader.Options(conf).include(globalIncludes); + evolution = new SchemaEvolution(fileMetadata.getSchema(), readerSchema, options); consumer.setFileMetadata(fileMetadata); consumer.setIncludedColumns(globalIncludes); - consumer.setReaderSchema(readerSchema); + consumer.setSchemaEvolution(evolution); } @Override @@ -274,9 +275,6 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> try { if (sarg != null && stride != 0) { // TODO: move this to a common method - TypeDescription schema = OrcUtils.convertTypeFromProtobuf(fileMetadata.getTypes(), 0); - SchemaEvolution evolution = new SchemaEvolution(schema, - null, globalIncludes); int[] filterColumns = RecordReaderImpl.mapSargColumnsToOrcInternalColIdx( sarg.getLeaves(), evolution); // included will not be null, row options will fill the array with trues if null @@ -369,7 +367,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> ensureMetadataReader(); long startTimeHdfs = counters.startTimeCounter(); stripeMetadata = new OrcStripeMetadata(new OrcBatchKey(fileKey, stripeIx, 0), - metadataReader, stripe, globalIncludes, sargColumns); + metadataReader, stripe, globalIncludes, sargColumns, + orcReader.getSchema(), orcReader.getWriterVersion()); counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTimeHdfs); if (hasFileId && metadataCache != null) { stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata); @@ -625,7 +624,8 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> if (value == null) { long startTime = counters.startTimeCounter(); value = new OrcStripeMetadata(new OrcBatchKey(fileKey, stripeIx, 0), - metadataReader, si, globalInc, sargColumns); + metadataReader, si, globalInc, sargColumns, orcReader.getSchema(), + orcReader.getWriterVersion()); counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime); if (hasFileId && metadataCache != null) { value = metadataCache.putStripeMetadata(value); @@ -700,10 +700,9 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> List<OrcProto.Type> types = fileMetadata.getTypes(); String[] colNamesForSarg = OrcInputFormat.getSargColumnNames( columnNames, types, globalIncludes, fileMetadata.isOriginalFormat()); - TypeDescription schema = OrcUtils.convertTypeFromProtobuf(types, 0); - SchemaEvolution schemaEvolution = new SchemaEvolution(schema, globalIncludes); sargApp = new RecordReaderImpl.SargApplier(sarg, colNamesForSarg, - rowIndexStride, schemaEvolution); + rowIndexStride, evolution, + OrcFile.WriterVersion.from(fileMetadata.getWriterVersionNum())); } boolean hasAnyData = false; // readState should have been initialized by this time with an empty array. @@ -715,6 +714,7 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> if (sargApp != null) { OrcStripeMetadata stripeMetadata = metadata.get(stripeIxMod); rgsToRead = sargApp.pickRowGroups(stripe, stripeMetadata.getRowIndexes(), + stripeMetadata.getBloomFilterKinds(), stripeMetadata.getBloomFilterIndexes(), true); } boolean isNone = rgsToRead == RecordReaderImpl.SargApplier.READ_NO_RGS, @@ -914,14 +914,19 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> @Override public OrcIndex readRowIndex(StripeInformation stripe, + TypeDescription fileSchema, OrcProto.StripeFooter footer, + boolean ignoreNonUtf8BloomFilter, boolean[] included, OrcProto.RowIndex[] indexes, boolean[] sargColumns, + org.apache.orc.OrcFile.WriterVersion version, + OrcProto.Stream.Kind[] bloomFilterKinds, OrcProto.BloomFilterIndex[] bloomFilterIndices ) throws IOException { - return orcDataReader.readRowIndex(stripe, footer, included, indexes, - sargColumns, bloomFilterIndices); + return orcDataReader.readRowIndex(stripe, fileSchema, footer, + ignoreNonUtf8BloomFilter, included, indexes, + sargColumns, version, bloomFilterKinds, bloomFilterIndices); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java index 9ab26e6..8d86d17 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/SerDeEncodedDataReader.java @@ -56,7 +56,8 @@ import org.apache.hadoop.hive.ql.io.HdfsUtils; import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcFile.WriterOptions; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; -import org.apache.hadoop.hive.ql.io.orc.WriterImpl; +import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.ql.io.orc.Writer; import org.apache.hadoop.hive.ql.io.orc.encoded.CacheChunk; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -77,6 +78,7 @@ import org.apache.hive.common.util.FixedSizedObjectPool; import org.apache.hive.common.util.Ref; import org.apache.orc.CompressionCodec; import org.apache.orc.CompressionKind; +import org.apache.orc.OrcConf; import org.apache.orc.OrcUtils; import org.apache.orc.OrcFile.EncodingStrategy; import org.apache.orc.OrcFile.Version; @@ -84,8 +86,9 @@ import org.apache.orc.OrcProto; import org.apache.orc.OrcProto.ColumnEncoding; import org.apache.orc.TypeDescription; import org.apache.orc.impl.OutStream; -import org.apache.orc.impl.OutStream.OutputReceiver; -import org.apache.orc.impl.PhysicalWriter; +import org.apache.orc.PhysicalWriter; +import org.apache.orc.PhysicalWriter.OutputReceiver; +import org.apache.orc.impl.SchemaEvolution; import org.apache.orc.impl.StreamName; import org.apache.tez.common.CallableWithNdc; import org.apache.tez.common.counters.TezCounters; @@ -180,7 +183,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> this.parts = parts; this.daemonConf = new Configuration(daemonConf); // Disable dictionary encoding for the writer. - this.daemonConf.setDouble(ConfVars.HIVE_ORC_DICTIONARY_KEY_SIZE_THRESHOLD.varname, 0); + this.daemonConf.setDouble(OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.name(), 0); this.split = split; this.columnIds = columnIds; this.allocSize = determineAllocSize(bufferManager, daemonConf); @@ -208,6 +211,9 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> this.reporter = reporter; this.jobConf = jobConf; this.writerIncludes = OrcInputFormat.genIncludedColumns(schema, columnIds); + SchemaEvolution evolution = new SchemaEvolution(schema, + new Reader.Options(jobConf).include(writerIncludes)); + consumer.setSchemaEvolution(evolution); } private static int determineAllocSize(BufferUsageManager bufferManager, Configuration conf) { @@ -331,8 +337,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> private final List<Integer> columnIds; private final boolean[] writerIncludes; // These are global since ORC reuses objects between stripes. - private final Map<StreamName, OutStream> streams = new HashMap<>(); - private final Map<Integer, List<CacheOutStream>> colStreams = new HashMap<>(); + private final Map<StreamName, OutputReceiver> streams = new HashMap<>(); + private final Map<Integer, List<CacheOutputReceiver>> colStreams = new HashMap<>(); private final boolean doesSourceHaveIncludes; public CacheWriter(BufferUsageManager bufferManager, int bufferSize, @@ -354,10 +360,6 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> } @Override - public void initialize() throws IOException { - } - - @Override public void writeFileMetadata(OrcProto.Metadata.Builder builder) throws IOException { } @@ -399,7 +401,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> } @Override - public void writePostScript(OrcProto.PostScript.Builder builder) throws IOException { + public long writePostScript(OrcProto.PostScript.Builder builder) { + return 0; } @Override @@ -426,41 +429,45 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> } @Override - public long getPhysicalStripeSize() { - return 0; // Always 0, no memory checks. - } - - @Override - public boolean isCompressed() { - return false; - } - - @Override - public OutStream getOrCreatePhysicalStream(StreamName name) throws IOException { - OutStream os = streams.get(name); - if (os != null) return os; + public OutputReceiver createDataStream(StreamName name) throws IOException { + OutputReceiver or = streams.get(name); + if (or != null) return or; if (isNeeded(name)) { if (LlapIoImpl.LOG.isTraceEnabled()) { LlapIoImpl.LOG.trace("Creating cache receiver for " + name); } - CacheOutputReceiver or = new CacheOutputReceiver(bufferManager, name); - CacheOutStream cos = new CacheOutStream(name.toString(), bufferSize, null, or); - os = cos; - List<CacheOutStream> list = colStreams.get(name.getColumn()); + CacheOutputReceiver cor = new CacheOutputReceiver(bufferManager, name); + or = cor; + List<CacheOutputReceiver> list = colStreams.get(name.getColumn()); if (list == null) { list = new ArrayList<>(); colStreams.put(name.getColumn(), list); } - list.add(cos); + list.add(cor); } else { if (LlapIoImpl.LOG.isTraceEnabled()) { LlapIoImpl.LOG.trace("Creating null receiver for " + name); } - OutputReceiver or = new NullOutputReceiver(name); - os = new OutStream(name.toString(), bufferSize, null, or); + or = new NullOutputReceiver(name); } - streams.put(name, os); - return os; + streams.put(name, or); + return or; + } + + @Override + public void writeHeader() throws IOException { + + } + + @Override + public void writeIndex(StreamName name, OrcProto.RowIndex.Builder index, CompressionCodec codec) throws IOException { + // TODO: right now we treat each slice as a stripe with a single RG and never bother + // with indexes. In phase 4, we need to add indexing and filtering. + } + + @Override + public void writeBloomFilter(StreamName name, OrcProto.BloomFilterIndex.Builder bloom, CompressionCodec codec) throws IOException { + } @Override @@ -495,21 +502,20 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> } currentStripe.rowCount = si.getNumberOfRows(); // ORC writer reuses streams, so we need to clean them here and extract data. - for (Map.Entry<Integer, List<CacheOutStream>> e : colStreams.entrySet()) { - List<CacheOutStream> streams = e.getValue(); + for (Map.Entry<Integer, List<CacheOutputReceiver>> e : colStreams.entrySet()) { + int colIx = e.getKey(); + List<CacheOutputReceiver> streams = e.getValue(); List<CacheStreamData> data = new ArrayList<>(streams.size()); - for (CacheOutStream stream : streams) { - stream.flush(); - List<MemoryBuffer> buffers = stream.receiver.buffers; + for (CacheOutputReceiver receiver : streams) { + List<MemoryBuffer> buffers = receiver.buffers; if (buffers == null) { // This can happen e.g. for a data stream when all the values are null. - LlapIoImpl.LOG.debug("Buffers are null for " + stream.receiver.name); + LlapIoImpl.LOG.debug("Buffers are null for " + receiver.name); } - data.add(new CacheStreamData(stream.isSuppressed(), stream.receiver.name, + data.add(new CacheStreamData(receiver.suppressed, receiver.name, buffers == null ? new ArrayList<MemoryBuffer>() : new ArrayList<>(buffers))); - stream.clear(); + receiver.clear(); } - int colIx = e.getKey(); if (doesSourceHaveIncludes) { int newColIx = getSparseOrcIndexFromDenseDest(colIx); if (LlapIoImpl.LOG.isTraceEnabled()) { @@ -530,40 +536,18 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> return columnIds.get(denseColIx - 1) + 1; } - @Override - public long estimateMemory() { - return 0; // We never ever use any memory. - } - - @Override - public void writeIndexStream(StreamName name, - OrcProto.RowIndex.Builder rowIndex) throws IOException { - // TODO: right now we treat each slice as a stripe with a single RG and never bother - // with indexes. In phase 4, we need to add indexing and filtering. - } - private boolean isNeeded(StreamName name) { return doesSourceHaveIncludes || writerIncludes[name.getColumn()]; } @Override - public void writeBloomFilterStream(StreamName streamName, - OrcProto.BloomFilterIndex.Builder bloomFilterIndex) throws IOException { - } - - - @Override public void flush() throws IOException { } @Override - public long getRawWriterPosition() throws IOException { - return -1; // Meaningless for this writer. - } - - @Override - public void appendRawStripe(byte[] stripe, int offset, int length, - OrcProto.StripeInformation.Builder dirEntry) throws IOException { + public void appendRawStripe(ByteBuffer stripe, + OrcProto.StripeInformation.Builder dirEntry + ) throws IOException { throw new UnsupportedOperationException(); // Only used in ACID writer. } @@ -586,6 +570,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> private final StreamName name; private List<MemoryBuffer> buffers = null; private int lastBufferPos = -1; + private boolean suppressed = false; public CacheOutputReceiver(BufferUsageManager bufferManager, StreamName name) { this.bufferManager = bufferManager; @@ -595,6 +580,14 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> public void clear() { buffers = null; lastBufferPos = -1; + suppressed = false; + } + + @Override + public void suppress() { + suppressed = true; + buffers = null; + lastBufferPos = -1; } @Override @@ -655,6 +648,10 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> @Override public void output(ByteBuffer buffer) throws IOException { } + + @Override + public void suppress() { + } } protected Void performDataRead() throws IOException { @@ -1024,7 +1021,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> daemonConf, jobConf, split.getPath(), originalOi, columnIds); cacheWriter = new CacheWriter( bufferManager, allocSize, columnIds, splitIncludes, writer.hasIncludes()); - writer.init(new WriterImpl(cacheWriter, null, + writer.init(OrcFile.createWriter(split.getPath(), createOrcWriterOptions(writer.getDestinationOi()))); int rowsPerSlice = 0; @@ -1106,7 +1103,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> interface EncodingWriter { void writeOneRow(Writable row) throws IOException; StructObjectInspector getDestinationOi(); - void init(WriterImpl orcWriter); + void init(Writer orcWriter); boolean hasIncludes(); void writeIntermediateFooter() throws IOException; void flushIntermediateData() throws IOException; @@ -1114,7 +1111,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> } static class DeserialerOrcWriter implements EncodingWriter { - private WriterImpl orcWriter; + private Writer orcWriter; private final Deserializer sourceSerDe; private final StructObjectInspector sourceOi; @@ -1124,7 +1121,7 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> } @Override - public void init(WriterImpl orcWriter) { + public void init(Writer orcWriter) { this.orcWriter = orcWriter; } @@ -1169,7 +1166,8 @@ public class SerDeEncodedDataReader extends CallableWithNdc<Void> return OrcFile.writerOptions(daemonConf).stripeSize(Long.MAX_VALUE).blockSize(Long.MAX_VALUE) .rowIndexStride(Integer.MAX_VALUE) // For now, do not limit this - one RG per split .blockPadding(false).compress(CompressionKind.NONE).version(Version.CURRENT) - .encodingStrategy(EncodingStrategy.SPEED).bloomFilterColumns(null).inspector(sourceOi); + .encodingStrategy(EncodingStrategy.SPEED).bloomFilterColumns(null).inspector(sourceOi) + .physicalWriter(cacheWriter); } private ObjectInspector getOiFromSerDe() throws IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java index 98fc9df..63a3be2 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/VertorDeserializeOrcWriter.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorDeserializeRow; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; -import org.apache.hadoop.hive.ql.io.orc.WriterImpl; +import org.apache.hadoop.hive.ql.io.orc.Writer; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde.serdeConstants; @@ -58,7 +58,7 @@ import org.apache.hadoop.mapred.TextInputFormat; /** The class that writes rows from a text reader to an ORC writer using VectorDeserializeRow. */ class VertorDeserializeOrcWriter implements EncodingWriter { - private WriterImpl orcWriter; + private Writer orcWriter; private final LazySimpleDeserializeRead deserializeRead; private final VectorDeserializeRow<?> vectorDeserializeRow; private final VectorizedRowBatch sourceBatch, destinationBatch; @@ -255,7 +255,7 @@ class VertorDeserializeOrcWriter implements EncodingWriter { } @Override - public void init(WriterImpl orcWriter) { + public void init(Writer orcWriter) { this.orcWriter = orcWriter; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java index 5ef1678..1f3f7ea 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java @@ -22,27 +22,30 @@ import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; -import java.util.TimeZone; import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator; import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator.ObjectEstimator; import org.apache.hadoop.hive.llap.cache.EvictionDispatcher; import org.apache.hadoop.hive.llap.cache.LlapCacheableBuffer; import org.apache.hadoop.hive.ql.io.SyntheticFileId; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey; import org.apache.orc.DataReader; import org.apache.orc.OrcProto; import org.apache.orc.OrcProto.RowIndexEntry; import org.apache.orc.StripeInformation; +import org.apache.orc.TypeDescription; import org.apache.orc.impl.OrcIndex; public class OrcStripeMetadata extends LlapCacheableBuffer implements ConsumerStripeMetadata { + private final TypeDescription schema; private final OrcBatchKey stripeKey; private final List<OrcProto.ColumnEncoding> encodings; private final List<OrcProto.Stream> streams; private final String writerTimezone; private final long rowCount; private OrcIndex rowIndex; + private OrcFile.WriterVersion writerVersion; private final int estimatedMemUsage; @@ -59,16 +62,20 @@ public class OrcStripeMetadata extends LlapCacheableBuffer implements ConsumerSt } public OrcStripeMetadata(OrcBatchKey stripeKey, DataReader mr, StripeInformation stripe, - boolean[] includes, boolean[] sargColumns) throws IOException { + boolean[] includes, boolean[] sargColumns, TypeDescription schema, + OrcFile.WriterVersion writerVersion) throws IOException { + this.schema = schema; this.stripeKey = stripeKey; OrcProto.StripeFooter footer = mr.readStripeFooter(stripe); streams = footer.getStreamsList(); encodings = footer.getColumnsList(); writerTimezone = footer.getWriterTimezone(); rowCount = stripe.getNumberOfRows(); - rowIndex = mr.readRowIndex(stripe, footer, includes, null, sargColumns, null); + rowIndex = mr.readRowIndex(stripe, schema, footer, true, includes, null, + sargColumns, writerVersion, null, null); estimatedMemUsage = SIZE_ESTIMATOR.estimate(this, SIZE_ESTIMATORS); + this.writerVersion = writerVersion; } private OrcStripeMetadata(Object id) { @@ -76,6 +83,7 @@ public class OrcStripeMetadata extends LlapCacheableBuffer implements ConsumerSt encodings = new ArrayList<>(); streams = new ArrayList<>(); writerTimezone = ""; + schema = TypeDescription.fromString("struct<x:int>"); rowCount = estimatedMemUsage = 0; } @@ -90,7 +98,9 @@ public class OrcStripeMetadata extends LlapCacheableBuffer implements ConsumerSt OrcProto.BloomFilterIndex bfi = OrcProto.BloomFilterIndex.newBuilder().addBloomFilter( OrcProto.BloomFilter.newBuilder().addBitset(0)).build(); dummy.rowIndex = new OrcIndex( - new OrcProto.RowIndex[] { ri }, new OrcProto.BloomFilterIndex[] { bfi }); + new OrcProto.RowIndex[] { ri }, + new OrcProto.Stream.Kind[] { OrcProto.Stream.Kind.BLOOM_FILTER_UTF8 }, + new OrcProto.BloomFilterIndex[] { bfi }); return dummy; } @@ -113,8 +123,10 @@ public class OrcStripeMetadata extends LlapCacheableBuffer implements ConsumerSt superset[i] = superset[i] || (existing[i] != null); } // TODO: should we save footer to avoid a read here? - rowIndex = mr.readRowIndex(stripe, null, superset, rowIndex.getRowGroupIndex(), - sargColumns, rowIndex.getBloomFilterIndex()); + rowIndex = mr.readRowIndex(stripe, schema, null, true, includes, + rowIndex.getRowGroupIndex(), + sargColumns, writerVersion, rowIndex.getBloomFilterKinds(), + rowIndex.getBloomFilterIndex()); // TODO: theoretically, we should re-estimate memory usage here and update memory manager } @@ -126,6 +138,10 @@ public class OrcStripeMetadata extends LlapCacheableBuffer implements ConsumerSt return rowIndex.getRowGroupIndex(); } + public OrcProto.Stream.Kind[] getBloomFilterKinds() { + return rowIndex.getBloomFilterKinds(); + } + public OrcProto.BloomFilterIndex[] getBloomFilterIndexes() { return rowIndex.getBloomFilterIndex(); } http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java index 183fb1b..13c7767 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestIncrementalObjectSizeEstimator.java @@ -29,6 +29,8 @@ import java.util.LinkedHashSet; import org.apache.hadoop.hive.common.io.DiskRangeList; import org.apache.orc.DataReader; +import org.apache.orc.OrcFile; +import org.apache.orc.TypeDescription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.llap.IncrementalObjectSizeEstimator; @@ -59,11 +61,19 @@ public class TestIncrementalObjectSizeEstimator { @Override public OrcIndex readRowIndex(StripeInformation stripe, - OrcProto.StripeFooter footer, - boolean[] included, OrcProto.RowIndex[] indexes, boolean[] sargColumns, - OrcProto.BloomFilterIndex[] bloomFilterIndices) throws IOException { + TypeDescription fileSchema, + OrcProto.StripeFooter footer, + boolean ignoreNonUtf8BloomFilter, + boolean[] included, + OrcProto.RowIndex[] indexes, + boolean[] sargColumns, + OrcFile.WriterVersion version, + OrcProto.Stream.Kind[] bloomFilterKinds, + OrcProto.BloomFilterIndex[] bloomFilterIndices + ) throws IOException { if (isEmpty) { return new OrcIndex(new OrcProto.RowIndex[] { }, + bloomFilterKinds, new OrcProto.BloomFilterIndex[] { }); } OrcProto.ColumnStatistics cs = OrcProto.ColumnStatistics.newBuilder() @@ -102,7 +112,9 @@ public class TestIncrementalObjectSizeEstimator { bfi = OrcProto.BloomFilterIndex.newBuilder().mergeFrom(baos.toByteArray()).build(); } return new OrcIndex( - new OrcProto.RowIndex[] { ri, ri2 }, new OrcProto.BloomFilterIndex[] { bfi }); + new OrcProto.RowIndex[] { ri, ri2 }, + bloomFilterKinds, + new OrcProto.BloomFilterIndex[] { bfi }); } @Override @@ -166,20 +178,20 @@ public class TestIncrementalObjectSizeEstimator { mr.isEmpty = true; StripeInformation si = Mockito.mock(StripeInformation.class); Mockito.when(si.getNumberOfRows()).thenReturn(0L); - osm = new OrcStripeMetadata(stripeKey, mr, si, null, null); + osm = new OrcStripeMetadata(stripeKey, mr, si, null, null, null, null); LOG.info("Estimated " + root.estimate(osm, map) + " for an empty OSM"); mr.doStreamStep = true; - osm = new OrcStripeMetadata(stripeKey, mr, si, null, null); + osm = new OrcStripeMetadata(stripeKey, mr, si, null, null, null, null); LOG.info("Estimated " + root.estimate(osm, map) + " for an empty OSM after serde"); mr.isEmpty = false; stripeKey = new OrcBatchKey(0, 0, 0); - osm = new OrcStripeMetadata(stripeKey, mr, si, null, null); + osm = new OrcStripeMetadata(stripeKey, mr, si, null, null, null, null); LOG.info("Estimated " + root.estimate(osm, map) + " for a test OSM"); osm.resetRowIndex(); LOG.info("Estimated " + root.estimate(osm, map) + " for a test OSM w/o row index"); mr.doStreamStep = true; - osm = new OrcStripeMetadata(stripeKey, mr, si, null, null); + osm = new OrcStripeMetadata(stripeKey, mr, si, null, null, null, null); LOG.info("Estimated " + root.estimate(osm, map) + " for a test OSM after serde"); osm.resetRowIndex(); LOG.info("Estimated " + root.estimate(osm, map) + " for a test OSM w/o row index after serde"); http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/metastore/pom.xml ---------------------------------------------------------------------- diff --git a/metastore/pom.xml b/metastore/pom.xml index c1b707a..35752ff 100644 --- a/metastore/pom.xml +++ b/metastore/pom.xml @@ -146,17 +146,33 @@ <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <optional>true</optional> - <exclusions> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <exclusion> - <groupId>commmons-logging</groupId> - <artifactId>commons-logging</artifactId> - </exclusion> - </exclusions> - </dependency> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <version>${hadoop.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>commmons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/pom.xml ---------------------------------------------------------------------- diff --git a/orc/pom.xml b/orc/pom.xml deleted file mode 100644 index f75b91c..0000000 --- a/orc/pom.xml +++ /dev/null @@ -1,184 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.hive</groupId> - <artifactId>hive</artifactId> - <version>2.2.0-SNAPSHOT</version> - <relativePath>../pom.xml</relativePath> - </parent> - - <artifactId>hive-orc</artifactId> - <packaging>jar</packaging> - <name>Hive ORC</name> - - <properties> - <hive.path.to.root>..</hive.path.to.root> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-storage-api</artifactId> - </dependency> - - <!-- inter-project --> - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - <version>${protobuf.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet.jsp</groupId> - <artifactId>jsp-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop.version}</version> - <exclusions> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet.jsp</groupId> - <artifactId>jsp-api</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty</artifactId> - </exclusion> - <exclusion> - <groupId>org.mortbay.jetty</groupId> - <artifactId>jetty-util</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.avro</groupId> - <artifactId>avro</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.iq80.snappy</groupId> - <artifactId>snappy</artifactId> - <version>${snappy.version}</version> - </dependency> - - <!-- test inter-project --> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>${junit.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> - <version>${mockito-all.version}</version> - <scope>test</scope> - </dependency> - </dependencies> - - <profiles> - <profile> - <id>protobuf</id> - <build> - <plugins> - <plugin> - <groupId>com.github.os72</groupId> - <artifactId>protoc-jar-maven-plugin</artifactId> - <version>3.0.0-a3</version> - <executions> - <execution> - <phase>generate-sources</phase> - <goals> - <goal>run</goal> - </goals> - <configuration> - <protocVersion>2.5.0</protocVersion> - <addSources>none</addSources> - <outputDirectory>src/gen/protobuf-java</outputDirectory> - <includeDirectories> - <include>src/protobuf</include> - </includeDirectories> - <inputDirectories> - <include>src/protobuf</include> - </inputDirectories> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> - </profiles> - - <build> - <sourceDirectory>${basedir}/src/java</sourceDirectory> - <testSourceDirectory>${basedir}/src/test</testSourceDirectory> - <testResources> - <testResource> - <directory>${basedir}/src/test/resources</directory> - </testResource> - </testResources> - <plugins> - <plugin> - <groupId>org.codehaus.mojo</groupId> - <artifactId>build-helper-maven-plugin</artifactId> - <executions> - <execution> - <id>add-source</id> - <phase>generate-sources</phase> - <goals> - <goal>add-source</goal> - </goals> - <configuration> - <sources> - <source>src/gen/protobuf-java</source> - </sources> - </configuration> - </execution> - </executions> - </plugin> - </plugins> - </build> -</project>
