TAJO-2102: Migrate to Apache Orc from Presto's one. Closes #985
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/68263585 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/68263585 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/68263585 Branch: refs/heads/master Commit: 68263585296e30f93e541c36908a652df7398b9e Parents: 9fcc9fd Author: Jihoon Son <[email protected]> Authored: Wed Mar 23 10:39:31 2016 +0900 Committer: Jihoon Son <[email protected]> Committed: Wed Mar 23 10:39:59 2016 +0900 ---------------------------------------------------------------------- .../java/org/apache/tajo/catalog/TypeDesc.java | 4 + .../tajo-catalog-drivers/tajo-hive/pom.xml | 198 ++- .../tajo/catalog/store/HiveCatalogStore.java | 15 +- .../tajo/catalog/store/HiveCatalogUtil.java | 3 + .../catalog/store/TestHiveCatalogStore.java | 1 + .../org/apache/tajo/cli/tools/TajoDump.java | 2 +- .../org/apache/tajo/datum/TimestampDatum.java | 2 +- .../apache/tajo/storage/StorageConstants.java | 6 +- .../tajo/engine/query/TestSelectQuery.java | 19 - .../apache/tajo/storage/TestQueryOnOrcFile.java | 79 + .../TestQueryOnOrcFile/timezoned/timezoned1.tbl | 3 + .../TestSelectQuery/timezoned/table1.tbl | 3 - .../TestSelectQuery/timezoned/timezoned1.tbl | 3 + .../datetime_table_timezoned_ddl.sql | 5 + .../datetime_table_timezoned_orc_ddl.sql | 4 + .../TestQueryOnOrcFile/testTimezone1.sql | 1 + .../datetime_table_timezoned_orc_ddl.sql | 4 - .../TestSelectQuery/testTimezonedORCTable.sql | 2 - .../TestQueryOnOrcFile/testTimezone1.result | 5 + .../TestQueryOnOrcFile/testTimezone2.result | 5 + .../TestQueryOnOrcFile/testTimezone3.result | 5 + .../TestQueryOnOrcFile/testTimezone4.result | 5 + .../testTimezonedORCTable.result | 5 - tajo-dist/pom.xml | 14 +- tajo-dist/src/main/bin/tajo | 10 +- tajo-project/pom.xml | 3 +- .../src/main/resources/storage-default.xml | 2 +- .../src/test/resources/storage-default.xml | 2 +- tajo-storage/tajo-storage-hdfs/pom.xml | 34 +- .../apache/tajo/storage/orc/ORCAppender.java | 93 +- .../org/apache/tajo/storage/orc/ORCScanner.java | 332 ---- .../org/apache/tajo/storage/orc/OrcScanner.java | 460 ++++++ .../objectinspector/ObjectInspectorFactory.java | 91 - .../TajoBlobObjectInspector.java | 82 - .../TajoBooleanObjectInspector.java | 76 - .../TajoDateObjectInspector.java | 73 - .../TajoDoubleObjectInspector.java | 76 - .../TajoFloatObjectInspector.java | 76 - .../objectinspector/TajoIntObjectInspector.java | 76 - .../TajoLongObjectInspector.java | 76 - .../TajoNullObjectInspector.java | 69 - .../TajoPrimitiveObjectInspector.java | 38 - .../TajoShortObjectInspector.java | 76 - .../TajoStringObjectInspector.java | 71 - .../TajoStructObjectInspector.java | 122 -- .../TajoTimestampObjectInspector.java | 73 - .../thirdparty/orc/BinaryColumnStatistics.java | 25 - .../storage/thirdparty/orc/BitFieldWriter.java | 69 - .../storage/thirdparty/orc/BloomFilterIO.java | 42 - .../thirdparty/orc/BooleanColumnStatistics.java | 27 - .../thirdparty/orc/ByteBufferAllocatorPool.java | 102 ++ .../thirdparty/orc/ByteBufferPoolAdapter.java | 41 + .../thirdparty/orc/ColumnStatistics.java | 36 - .../thirdparty/orc/ColumnStatisticsImpl.java | 1017 ------------ .../thirdparty/orc/CompressionCodec.java | 68 - .../storage/thirdparty/orc/CompressionKind.java | 27 - .../thirdparty/orc/DateColumnStatistics.java | 37 - .../thirdparty/orc/DecimalColumnStatistics.java | 45 - .../orc/DirectDecompressionCodec.java | 26 - .../thirdparty/orc/DoubleColumnStatistics.java | 44 - .../thirdparty/orc/DynamicByteArray.java | 303 ---- .../storage/thirdparty/orc/DynamicIntArray.java | 142 -- .../thirdparty/orc/HdfsOrcDataSource.java | 133 -- .../thirdparty/orc/IntegerColumnStatistics.java | 50 - .../storage/thirdparty/orc/IntegerWriter.java | 47 - .../storage/thirdparty/orc/MemoryManager.java | 212 --- .../tajo/storage/thirdparty/orc/Metadata.java | 45 - .../tajo/storage/thirdparty/orc/OrcFile.java | 389 +++-- .../storage/thirdparty/orc/OrcRecordReader.java | 454 +++++ .../tajo/storage/thirdparty/orc/OrcUtils.java | 242 +-- .../tajo/storage/thirdparty/orc/OutStream.java | 286 ---- .../thirdparty/orc/PositionRecorder.java | 25 - .../thirdparty/orc/PositionedOutputStream.java | 38 - .../thirdparty/orc/RecordReaderUtils.java | 393 +++++ .../storage/thirdparty/orc/RedBlackTree.java | 309 ---- .../thirdparty/orc/RunLengthByteWriter.java | 106 -- .../thirdparty/orc/RunLengthIntegerWriter.java | 143 -- .../orc/RunLengthIntegerWriterV2.java | 832 ---------- .../thirdparty/orc/SerializationUtils.java | 844 ---------- .../storage/thirdparty/orc/SnappyCodec.java | 109 -- .../tajo/storage/thirdparty/orc/StreamName.java | 95 -- .../thirdparty/orc/StringColumnStatistics.java | 41 - .../thirdparty/orc/StringRedBlackTree.java | 202 --- .../thirdparty/orc/StripeInformation.java | 59 - .../thirdparty/orc/StripeStatistics.java | 42 - .../orc/TimestampColumnStatistics.java | 38 - .../thirdparty/orc/TreeReaderFactory.java | 1557 ++++++++++++++++++ .../tajo/storage/thirdparty/orc/Writer.java | 2 + .../tajo/storage/thirdparty/orc/WriterImpl.java | 813 +++++---- .../storage/thirdparty/orc/ZeroCopyAdapter.java | 57 + .../tajo/storage/thirdparty/orc/ZlibCodec.java | 169 -- .../src/main/proto/orc_proto.proto | 217 --- .../tajo/storage/TestCompressionStorages.java | 13 +- .../org/apache/tajo/storage/TestStorages.java | 69 +- .../resources/dataset/testVariousTypes.avsc | 3 +- .../src/test/resources/storage-default.xml | 2 +- 96 files changed, 4214 insertions(+), 8277 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TypeDesc.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TypeDesc.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TypeDesc.java index 3bd0f00..3ca83f9 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TypeDesc.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/TypeDesc.java @@ -55,6 +55,10 @@ public class TypeDesc { } } + public Schema getNestedSchema() { + return nestedRecordSchema; + } + public int hashCode() { return Objects.hashCode(dataType.hashCode(), nestedRecordSchema); } http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-catalog/tajo-catalog-drivers/tajo-hive/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/pom.xml b/tajo-catalog/tajo-catalog-drivers/tajo-hive/pom.xml index 1a8a188..d848461 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/pom.xml +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/pom.xml @@ -33,8 +33,6 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> - <parquet.version>1.5.0</parquet.version> - <parquet.format.version>2.1.0</parquet.format.version> </properties> <build> @@ -136,19 +134,35 @@ </dependency> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> + <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>zookeeper</artifactId> + <groupId>org.apache.zookeeper</groupId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> + <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${hadoop.version}</version> <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>hadoop-yarn-common</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>netty</artifactId> + <groupId>io.netty</groupId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.hive</groupId> - <artifactId>hive-exec</artifactId> + <artifactId>hive-metastore</artifactId> <version>${hive.version}</version> <scope>provided</scope> <exclusions> @@ -158,129 +172,201 @@ </exclusion> <exclusion> <groupId>org.apache.hive</groupId> - <artifactId>hive-contrib</artifactId> + <artifactId>hive-serde</artifactId> </exclusion> <exclusion> <groupId>org.apache.hive</groupId> - <artifactId>hive-hbase-handler</artifactId> + <artifactId>hive-shimss</artifactId> </exclusion> <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-metastore</artifactId> + <groupId>org.apache.thrift</groupId> + <artifactId>libfb303</artifactId> </exclusion> <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-serde</artifactId> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> </exclusion> <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-shims</artifactId> + <groupId>com.jolbox</groupId> + <artifactId>bonecp</artifactId> </exclusion> <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-testutils</artifactId> + <artifactId>tephra-hbase-compat-1.0</artifactId> + <groupId>co.cask.tephra</groupId> </exclusion> <exclusion> - <groupId>org.apache.thrift</groupId> - <artifactId>libfb303</artifactId> + <artifactId>tephra-core</artifactId> + <groupId>co.cask.tephra</groupId> </exclusion> <exclusion> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> + <artifactId>tephra-api</artifactId> + <groupId>co.cask.tephra</groupId> </exclusion> <exclusion> - <groupId>com.jolbox</groupId> - <artifactId>bonecp</artifactId> + <artifactId>hbase-client</artifactId> + <groupId>org.apache.hbase</groupId> </exclusion> <exclusion> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> + <artifactId>hadoop-yarn-server-resourcemanager</artifactId> + <groupId>org.apache.hadoop</groupId> </exclusion> <exclusion> - <groupId>org.apache.calcite</groupId> - <artifactId>calcite-core</artifactId> + <artifactId>antlr-runtime</artifactId> + <groupId>org.antlr</groupId> </exclusion> <exclusion> - <groupId>org.apache.calcite</groupId> - <artifactId>calcite-avatica</artifactId> + <artifactId>log4j-slf4j-impl</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>zookeeper</artifactId> + <groupId>org.apache.zookeeper</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hive</groupId> - <artifactId>hive-metastore</artifactId> + <artifactId>hive-common</artifactId> <version>${hive.version}</version> <scope>provided</scope> <exclusions> <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-common</artifactId> + <artifactId>jetty-all</artifactId> + <groupId>org.eclipse.jetty.aggregate</groupId> </exclusion> <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-serde</artifactId> + <artifactId>javax.servlet</artifactId> + <groupId>org.eclipse.jetty.orbit</groupId> </exclusion> <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-shimss</artifactId> + <artifactId>joda-time</artifactId> + <groupId>joda-time</groupId> </exclusion> <exclusion> - <groupId>org.apache.thrift</groupId> - <artifactId>libfb303</artifactId> + <artifactId>jackson-databind</artifactId> + <groupId>com.fasterxml.jackson.core</groupId> </exclusion> <exclusion> - <groupId>org.apache.thrift</groupId> - <artifactId>libthrift</artifactId> + <artifactId>metrics-json</artifactId> + <groupId>io.dropwizard.metrics</groupId> </exclusion> <exclusion> - <groupId>com.jolbox</groupId> - <artifactId>bonecp</artifactId> + <artifactId>metrics-jvm</artifactId> + <groupId>io.dropwizard.metrics</groupId> + </exclusion> + <exclusion> + <artifactId>metrics-core</artifactId> + <groupId>io.dropwizard.metrics</groupId> + </exclusion> + <exclusion> + <artifactId>ant</artifactId> + <groupId>org.apache.ant</groupId> + </exclusion> + <exclusion> + <artifactId>json</artifactId> + <groupId>org.json</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-slf4j-impl</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-web</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-1.2-api</artifactId> + <groupId>org.apache.logging.log4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hive</groupId> - <artifactId>hive-cli</artifactId> + <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <scope>provided</scope> <exclusions> <exclusion> + <artifactId>hive-ant</artifactId> <groupId>org.apache.hive</groupId> - <artifactId>hive-common</artifactId> </exclusion> <exclusion> + <artifactId>hive-llap-tez</artifactId> <groupId>org.apache.hive</groupId> - <artifactId>hive-exec</artifactId> </exclusion> <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-metastore</artifactId> + <artifactId>ST4</artifactId> + <groupId>org.antlr</groupId> </exclusion> <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-serde</artifactId> + <artifactId>ivy</artifactId> + <groupId>org.apache.ivy</groupId> </exclusion> <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-service</artifactId> + <artifactId>curator-framework</artifactId> + <groupId>org.apache.curator</groupId> </exclusion> <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-shims</artifactId> + <artifactId>apache-curator</artifactId> + <groupId>org.apache.curator</groupId> </exclusion> <exclusion> - <groupId>com.jolbox</groupId> - <artifactId>bonecp</artifactId> + <artifactId>groovy-all</artifactId> + <groupId>org.codehaus.groovy</groupId> + </exclusion> + <exclusion> + <artifactId>calcite-core</artifactId> + <groupId>org.apache.calcite</groupId> + </exclusion> + <exclusion> + <artifactId>calcite-avatica</artifactId> + <groupId>org.apache.calcite</groupId> + </exclusion> + <exclusion> + <artifactId>stax-api</artifactId> + <groupId>stax</groupId> </exclusion> <exclusion> - <groupId>jline</groupId> <artifactId>jline</artifactId> + <groupId>jline</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-1.2-api</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>log4j-slf4j-impl</artifactId> + <groupId>org.apache.logging.log4j</groupId> + </exclusion> + <exclusion> + <artifactId>ant</artifactId> + <groupId>org.apache.ant</groupId> + </exclusion> + <exclusion> + <artifactId>zookeeper</artifactId> + <groupId>org.apache.zookeeper</groupId> + </exclusion> + <exclusion> + <artifactId>antlr-runtime</artifactId> + <groupId>org.antlr</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-serde</artifactId> + <version>${hive.version}</version> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>opencsv</artifactId> + <groupId>net.sf.opencsv</groupId> </exclusion> </exclusions> </dependency> <dependency> - <groupId>com.twitter</groupId> - <artifactId>parquet-hive-bundle</artifactId> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop-bundle</artifactId> <version>${parquet.version}</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java index 63f18b6..95cbf18 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java @@ -38,12 +38,15 @@ import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.orc.OrcConf; +import org.apache.parquet.hadoop.ParquetOutputFormat; import org.apache.tajo.BuiltinStorages; import org.apache.tajo.TajoConstants; import org.apache.tajo.algebra.Expr; import org.apache.tajo.algebra.IsNullPredicate; import org.apache.tajo.algebra.JsonHelper; import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.*; @@ -56,10 +59,8 @@ import org.apache.tajo.plan.util.PartitionFilterAlgebraVisitor; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; import org.apache.thrift.TException; -import parquet.hadoop.ParquetOutputFormat; import java.io.File; -import java.io.IOException; import java.util.*; public class HiveCatalogStore extends CatalogConstants implements CatalogStore { @@ -564,6 +565,16 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { table.putToParameters(ParquetOutputFormat.COMPRESSION, tableDesc.getMeta().getProperty(ParquetOutputFormat.COMPRESSION)); } + } else if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.ORC)) { + StorageFormatDescriptor descriptor = storageFormatFactory.get(IOConstants.ORC); + sd.setInputFormat(descriptor.getInputFormat()); + sd.setOutputFormat(descriptor.getOutputFormat()); + sd.getSerdeInfo().setSerializationLib(descriptor.getSerde()); + + if (tableDesc.getMeta().containsProperty(OrcConf.COMPRESS.getAttribute())) { + table.putToParameters(OrcConf.COMPRESS.getAttribute(), + tableDesc.getMeta().getProperty(OrcConf.COMPRESS.getAttribute())); + } } else { throw new UnsupportedException(tableDesc.getMeta().getDataFormat() + " in HivecatalogStore"); } http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java index bbb7ade..87b391e 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java @@ -22,6 +22,7 @@ import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde.serdeConstants; @@ -137,6 +138,8 @@ public class HiveCatalogUtil { return BuiltinStorages.PARQUET; } else if (AvroSerDe.class.getName().equals(serde)) { return BuiltinStorages.AVRO; + } else if (OrcSerde.class.getName().equals(serde)) { + return BuiltinStorages.ORC; } else { throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java index 7e1a3a4..46935fc 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java @@ -78,6 +78,7 @@ public class TestHiveCatalogStore { conf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehousePath.toUri().toString()); conf.set(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, jdbcUri); conf.set(TajoConf.ConfVars.WAREHOUSE_DIR.varname, warehousePath.toUri().toString()); + conf.setBoolean("datanucleus.schema.autoCreateAll", true); // create local HiveCatalogStore. TajoConf tajoConf = new TajoConf(conf); http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java ---------------------------------------------------------------------- diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java index 4df418f..c9fa2b4 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java @@ -208,7 +208,7 @@ public class TajoDump { } } writer.write("\n\n"); - } catch (Exception e) { + } catch (Throwable e) { // dump for each table can throw any exception. We need to skip the exception case. // here, the error message prints out via stderr. System.err.println("ERROR:" + tableName + "," + e.getMessage()); http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java b/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java index 5b4c152..f69e7da 100644 --- a/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java +++ b/tajo-common/src/main/java/org/apache/tajo/datum/TimestampDatum.java @@ -125,7 +125,7 @@ public class TimestampDatum extends Datum { /** * - * @param tm TimeMEta + * @param tm TimeMeta * @param timeZone Timezone * @param includeTimeZone Add timezone if it is true. It is usually used for TIMEZONEZ * @return A timestamp string http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java index 097963c..4612323 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java @@ -89,11 +89,7 @@ public class StorageConstants { public static final String DEFAULT_ORC_STRIPE_SIZE = "67108864"; // 64MB public static final String ORC_COMPRESSION = "orc.compress"; - public static final String ORC_COMPRESSION_KIND_NONE = "none"; - public static final String ORC_COMPRESSION_KIND_SNAPPY = "snappy"; - public static final String ORC_COMPRESSION_KIND_LZO = "lzo"; - public static final String ORC_COMPRESSION_KIND_ZIP = "zlip"; - public static final String DEFAULT_ORC_COMPRESSION_KIND = ORC_COMPRESSION_KIND_NONE; + public static final String DEFAULT_ORC_COMPRESSION_KIND = "none"; public static final String ORC_BUFFER_SIZE = "orc.buffer.size"; public static final String DEFAULT_ORC_BUFFER_SIZE = "262144"; // 256KB http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java index e55acf1..a2dec50 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java @@ -682,25 +682,6 @@ public class TestSelectQuery extends QueryTestCaseBase { executeString("DROP TABLE IF EXISTS timezoned_load2 PURGE"); } } - - @Test - public void testTimezonedORCTable() throws Exception { - try { - - executeDDL("datetime_table_timezoned_ddl.sql", "timezoned", "timezoned"); - executeDDL("datetime_table_timezoned_orc_ddl.sql", null, "timezoned_orc"); - - executeString("INSERT OVERWRITE INTO timezoned_orc SELECT t_timestamp, t_date FROM timezoned"); - - ResultSet res = executeQuery(); - assertResultSet(res, "testTimezonedORCTable.result"); - executeString("SET TIME ZONE 'GMT'"); - cleanupQuery(res); - } finally { - executeString("DROP TABLE IF EXISTS timezoned"); - executeString("DROP TABLE IF EXISTS timezoned_orc PURGE"); - } - } @Test public void testMultiBytesDelimiter1() throws Exception { http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestQueryOnOrcFile.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestQueryOnOrcFile.java b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestQueryOnOrcFile.java new file mode 100644 index 0000000..29d132e --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/storage/TestQueryOnOrcFile.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.QueryTestCaseBase; +import org.junit.*; +import org.junit.experimental.categories.Category; + +import java.sql.ResultSet; + +@Category(IntegrationTest.class) +public class TestQueryOnOrcFile extends QueryTestCaseBase { + + @Before + public void setup() throws Exception { + executeDDL("datetime_table_timezoned_ddl.sql", "timezoned", "timezoned"); + executeDDL("datetime_table_timezoned_orc_ddl.sql", null, "timezoned_orc"); + + executeString("INSERT OVERWRITE INTO timezoned_orc SELECT t_timestamp, t_date FROM timezoned"); + } + + @After + public void teardown() throws Exception { + executeString("DROP TABLE IF EXISTS timezoned"); + executeString("DROP TABLE IF EXISTS timezoned_orc PURGE"); + } + + @Test + public void testTimezone1() throws Exception { + executeString("SET TIME ZONE 'GMT+9'"); + ResultSet res = executeQuery(); + assertResultSet(res); + executeString("SET TIME ZONE 'GMT'"); + cleanupQuery(res); + } + + @Test + public void testTimezone2() throws Exception { + executeString("SET TIME ZONE 'GMT+1'"); + ResultSet res = executeString("select * from timezoned_orc"); + assertResultSet(res); + executeString("SET TIME ZONE 'GMT'"); + cleanupQuery(res); + } + + @Test + public void testTimezone3() throws Exception { + executeString("SET TIME ZONE 'GMT'"); + ResultSet res = executeString("select * from timezoned_orc"); + assertResultSet(res); + cleanupQuery(res); + } + + @Test + public void testTimezone4() throws Exception { + executeString("\\set TIMEZONE 'GMT-5'"); + ResultSet res = executeString("select * from timezoned_orc"); + assertResultSet(res); + executeString("SET TIME ZONE 'GMT'"); + cleanupQuery(res); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-core-tests/src/test/resources/dataset/TestQueryOnOrcFile/timezoned/timezoned1.tbl ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/dataset/TestQueryOnOrcFile/timezoned/timezoned1.tbl b/tajo-core-tests/src/test/resources/dataset/TestQueryOnOrcFile/timezoned/timezoned1.tbl new file mode 100644 index 0000000..74b2e1b --- /dev/null +++ b/tajo-core-tests/src/test/resources/dataset/TestQueryOnOrcFile/timezoned/timezoned1.tbl @@ -0,0 +1,3 @@ +1980-4-1 01:50:30.010|01:50:30.010|1980-04-01 +80/4/1 1:50:30 AM|1:50:30 AM|80/4/1 +1980 April 1 1:50:30|1:50:30|1980-04-01 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-core-tests/src/test/resources/dataset/TestSelectQuery/timezoned/table1.tbl ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/dataset/TestSelectQuery/timezoned/table1.tbl b/tajo-core-tests/src/test/resources/dataset/TestSelectQuery/timezoned/table1.tbl deleted file mode 100644 index 74b2e1b..0000000 --- a/tajo-core-tests/src/test/resources/dataset/TestSelectQuery/timezoned/table1.tbl +++ /dev/null @@ -1,3 +0,0 @@ -1980-4-1 01:50:30.010|01:50:30.010|1980-04-01 -80/4/1 1:50:30 AM|1:50:30 AM|80/4/1 -1980 April 1 1:50:30|1:50:30|1980-04-01 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-core-tests/src/test/resources/dataset/TestSelectQuery/timezoned/timezoned1.tbl ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/dataset/TestSelectQuery/timezoned/timezoned1.tbl b/tajo-core-tests/src/test/resources/dataset/TestSelectQuery/timezoned/timezoned1.tbl new file mode 100644 index 0000000..74b2e1b --- /dev/null +++ b/tajo-core-tests/src/test/resources/dataset/TestSelectQuery/timezoned/timezoned1.tbl @@ -0,0 +1,3 @@ +1980-4-1 01:50:30.010|01:50:30.010|1980-04-01 +80/4/1 1:50:30 AM|1:50:30 AM|80/4/1 +1980 April 1 1:50:30|1:50:30|1980-04-01 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-core-tests/src/test/resources/queries/TestQueryOnOrcFile/datetime_table_timezoned_ddl.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestQueryOnOrcFile/datetime_table_timezoned_ddl.sql b/tajo-core-tests/src/test/resources/queries/TestQueryOnOrcFile/datetime_table_timezoned_ddl.sql new file mode 100644 index 0000000..9c5d30d --- /dev/null +++ b/tajo-core-tests/src/test/resources/queries/TestQueryOnOrcFile/datetime_table_timezoned_ddl.sql @@ -0,0 +1,5 @@ +CREATE EXTERNAL TABLE ${0} ( + t_timestamp TIMESTAMP, + t_time TIME, + t_date DATE +) USING TEXT WITH ('timezone' = 'GMT+9') LOCATION ${table.path} http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-core-tests/src/test/resources/queries/TestQueryOnOrcFile/datetime_table_timezoned_orc_ddl.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestQueryOnOrcFile/datetime_table_timezoned_orc_ddl.sql b/tajo-core-tests/src/test/resources/queries/TestQueryOnOrcFile/datetime_table_timezoned_orc_ddl.sql new file mode 100644 index 0000000..49e1f7e --- /dev/null +++ b/tajo-core-tests/src/test/resources/queries/TestQueryOnOrcFile/datetime_table_timezoned_orc_ddl.sql @@ -0,0 +1,4 @@ +CREATE TABLE ${0} ( + t_timestamp TIMESTAMP, + t_date DATE +) USING ORC WITH ('timezone' = 'GMT+9') http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-core-tests/src/test/resources/queries/TestQueryOnOrcFile/testTimezone1.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestQueryOnOrcFile/testTimezone1.sql b/tajo-core-tests/src/test/resources/queries/TestQueryOnOrcFile/testTimezone1.sql new file mode 100644 index 0000000..2464c97 --- /dev/null +++ b/tajo-core-tests/src/test/resources/queries/TestQueryOnOrcFile/testTimezone1.sql @@ -0,0 +1 @@ +SELECT * FROM timezoned_orc; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-core-tests/src/test/resources/queries/TestSelectQuery/datetime_table_timezoned_orc_ddl.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestSelectQuery/datetime_table_timezoned_orc_ddl.sql b/tajo-core-tests/src/test/resources/queries/TestSelectQuery/datetime_table_timezoned_orc_ddl.sql deleted file mode 100644 index 49e1f7e..0000000 --- a/tajo-core-tests/src/test/resources/queries/TestSelectQuery/datetime_table_timezoned_orc_ddl.sql +++ /dev/null @@ -1,4 +0,0 @@ -CREATE TABLE ${0} ( - t_timestamp TIMESTAMP, - t_date DATE -) USING ORC WITH ('timezone' = 'GMT+9') http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-core-tests/src/test/resources/queries/TestSelectQuery/testTimezonedORCTable.sql ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/queries/TestSelectQuery/testTimezonedORCTable.sql b/tajo-core-tests/src/test/resources/queries/TestSelectQuery/testTimezonedORCTable.sql deleted file mode 100644 index 1d898bd..0000000 --- a/tajo-core-tests/src/test/resources/queries/TestSelectQuery/testTimezonedORCTable.sql +++ /dev/null @@ -1,2 +0,0 @@ -SET SESSION TIMEZONE = 'GMT+9'; -SELECT * FROM timezoned_orc; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone1.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone1.result b/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone1.result new file mode 100644 index 0000000..39f593b --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone1.result @@ -0,0 +1,5 @@ +t_timestamp,t_date +------------------------------- +1980-04-01 01:50:30.01,1980-04-01 +1980-04-01 01:50:30,1980-04-01 +1980-04-01 01:50:30,1980-04-01 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone2.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone2.result b/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone2.result new file mode 100644 index 0000000..c0e5cef --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone2.result @@ -0,0 +1,5 @@ +t_timestamp,t_date +------------------------------- +1980-03-31 17:50:30.01,1980-04-01 +1980-03-31 17:50:30,1980-04-01 +1980-03-31 17:50:30,1980-04-01 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone3.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone3.result b/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone3.result new file mode 100644 index 0000000..916f4be --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone3.result @@ -0,0 +1,5 @@ +t_timestamp,t_date +------------------------------- +1980-03-31 16:50:30.01,1980-04-01 +1980-03-31 16:50:30,1980-04-01 +1980-03-31 16:50:30,1980-04-01 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone4.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone4.result b/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone4.result new file mode 100644 index 0000000..98e0918 --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestQueryOnOrcFile/testTimezone4.result @@ -0,0 +1,5 @@ +t_timestamp,t_date +------------------------------- +1980-03-31 11:50:30.01,1980-04-01 +1980-03-31 11:50:30,1980-04-01 +1980-03-31 11:50:30,1980-04-01 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-core-tests/src/test/resources/results/TestSelectQuery/testTimezonedORCTable.result ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/resources/results/TestSelectQuery/testTimezonedORCTable.result b/tajo-core-tests/src/test/resources/results/TestSelectQuery/testTimezonedORCTable.result deleted file mode 100644 index 39f593b..0000000 --- a/tajo-core-tests/src/test/resources/results/TestSelectQuery/testTimezonedORCTable.result +++ /dev/null @@ -1,5 +0,0 @@ -t_timestamp,t_date -------------------------------- -1980-04-01 01:50:30.01,1980-04-01 -1980-04-01 01:50:30,1980-04-01 -1980-04-01 01:50:30,1980-04-01 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-dist/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml index 095f128..652ab84 100644 --- a/tajo-dist/pom.xml +++ b/tajo-dist/pom.xml @@ -154,22 +154,14 @@ run cp -r ${project.basedir}/src/main/conf . run rm -rf lib/tajo-*-${project.version}.jar - run mkdir hive - run mv lib/hive-*.jar hive/ - + run mkdir -p lib + run cp -r $ROOT/tajo-storage/tajo-storage-hdfs/target/lib/hive-*.jar lib/ + run mkdir -p share/jdbc-dist run cp -r $ROOT/tajo-jdbc/target/tajo-jdbc-${project.version}-jar-with-dependencies.jar ./share/jdbc-dist/tajo-jdbc-${project.version}.jar run mkdir -p extlib - if [ -f $ROOT/tajo-catalog/tajo-catalog-drivers/tajo-hive/target/lib/parquet-hive-bundle-*.jar ] - then - run cp -r $ROOT/tajo-catalog/tajo-catalog-drivers/tajo-hive/target/lib/parquet-hive-bundle-*.jar lib/ - echo - echo "Tajo installed parquet-hive-bundle library at: ${project.build.directory}/tajo-${project.version}" - echo - fi - echo echo "Tajo dist layout available at: ${project.build.directory}/tajo-${project.version}" echo http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-dist/src/main/bin/tajo ---------------------------------------------------------------------- diff --git a/tajo-dist/src/main/bin/tajo b/tajo-dist/src/main/bin/tajo index c08c538..007e960 100755 --- a/tajo-dist/src/main/bin/tajo +++ b/tajo-dist/src/main/bin/tajo @@ -300,11 +300,15 @@ if [ ! -z ${HIVE_HOME} ] && [ -d ${HIVE_HOME} ] && [ -d ${HIVE_LIB} ]; then CLASSPATH=${CLASSPATH}:$f; done - for f in ${HIVE_LIB}/datanucleus-*.jar; do + for f in ${HIVE_LIB}/javax.jdo-*.jar; do CLASSPATH=${CLASSPATH}:$f; done -else - for f in $TAJO_HOME/hive/*.jar; do + + for f in ${HIVE_LIB}/log4j-core-*.jar; do + CLASSPATH=${CLASSPATH}:$f; + done + + for f in ${HIVE_LIB}/datanucleus-*.jar; do CLASSPATH=${CLASSPATH}:$f; done fi http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-project/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-project/pom.xml b/tajo-project/pom.xml index cd86d3b..27fa66b 100644 --- a/tajo-project/pom.xml +++ b/tajo-project/pom.xml @@ -36,10 +36,11 @@ <hadoop.version>2.7.2</hadoop.version> <protobuf.version>2.5.0</protobuf.version> <hbase.version>1.1.1</hbase.version> - <hive.version>1.1.0</hive.version> + <hive.version>2.0.0</hive.version> <netty.version>4.0.34.Final</netty.version> <jersey.version>2.6</jersey.version> <jetty.version>6.1.26</jetty.version> + <parquet.version>1.8.1</parquet.version> <tajo.root>${project.parent.relativePath}/..</tajo.root> <extra.source.path>src/main/hadoop-${hadoop.version}</extra.source.path> </properties> http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml index 7f4661b..2454714 100644 --- a/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/main/resources/storage-default.xml @@ -130,7 +130,7 @@ <property> <name>tajo.storage.scanner-handler.orc.class</name> - <value>org.apache.tajo.storage.orc.ORCScanner</value> + <value>org.apache.tajo.storage.orc.OrcScanner</value> </property> <property> http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml index 934dd01..1c4530a 100644 --- a/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml +++ b/tajo-storage/tajo-storage-common/src/test/resources/storage-default.xml @@ -132,7 +132,7 @@ <property> <name>tajo.storage.scanner-handler.orc.class</name> - <value>org.apache.tajo.storage.orc.ORCScanner</value> + <value>org.apache.tajo.storage.orc.OrcScanner</value> </property> <property> http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml index 5f66395..aa6e6a6 100644 --- a/tajo-storage/tajo-storage-hdfs/pom.xml +++ b/tajo-storage/tajo-storage-hdfs/pom.xml @@ -34,7 +34,6 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> - <parquet.version>1.8.1</parquet.version> </properties> <repositories> @@ -129,7 +128,6 @@ <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument> <argument>--java_out=target/generated-sources/proto</argument> <argument>src/main/proto/StorageFragmentProtos.proto</argument> - <argument>src/main/proto/orc_proto.proto</argument> </arguments> </configuration> <goals> @@ -161,6 +159,26 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-report-plugin</artifactId> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <includeScope>runtime</includeScope> + <outputDirectory>${project.build.directory}/lib</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + </configuration> + </execution> + </executions> + </plugin> </plugins> </build> @@ -345,10 +363,16 @@ <artifactId>netty-buffer</artifactId> </dependency> <dependency> - <groupId>com.facebook.presto</groupId> - <artifactId>presto-orc</artifactId> - <version>0.141</version> + <groupId>org.apache.hive</groupId> + <artifactId>hive-orc</artifactId> + <version>${hive.version}</version> </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-storage-api</artifactId> + <version>${hive.version}</version> + </dependency> + </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java index 7999d02..b27c640 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java @@ -20,6 +20,9 @@ package org.apache.tajo.storage.orc; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.orc.CompressionKind; +import org.apache.orc.OrcConf; +import org.apache.orc.TypeDescription; import org.apache.tajo.TajoConstants; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; @@ -29,12 +32,13 @@ import org.apache.tajo.storage.FileAppender; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.storage.TableStatistics; import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.orc.objectinspector.ObjectInspectorFactory; -import org.apache.tajo.storage.thirdparty.orc.CompressionKind; import org.apache.tajo.storage.thirdparty.orc.OrcFile; +import org.apache.tajo.storage.thirdparty.orc.OrcFile.EncodingStrategy; +import org.apache.tajo.storage.thirdparty.orc.OrcUtils; import org.apache.tajo.storage.thirdparty.orc.Writer; import java.io.IOException; +import java.util.Properties; import java.util.TimeZone; public class ORCAppender extends FileAppender { @@ -46,21 +50,14 @@ public class ORCAppender extends FileAppender { TableMeta meta, Path workDir) { super(conf, taskAttemptId, schema, meta, workDir); - timezone = TimeZone.getTimeZone(meta.getProperty(StorageConstants.TIMEZONE, - TajoConstants.DEFAULT_SYSTEM_TIMEZONE)); + timezone = meta.containsProperty(StorageConstants.TIMEZONE) ? + TimeZone.getTimeZone(meta.getProperty(StorageConstants.TIMEZONE)) : + TimeZone.getDefault(); } @Override public void init() throws IOException { - writer = OrcFile.createWriter(workDir.getFileSystem(conf), path, conf, - ObjectInspectorFactory.buildStructObjectInspector(schema), - Long.parseLong(meta.getProperty(StorageConstants.ORC_STRIPE_SIZE, - StorageConstants.DEFAULT_ORC_STRIPE_SIZE)), getCompressionKind(), - Integer.parseInt(meta.getProperty(StorageConstants.ORC_BUFFER_SIZE, - StorageConstants.DEFAULT_ORC_BUFFER_SIZE)), - Integer.parseInt(meta.getProperty(StorageConstants.ORC_ROW_INDEX_STRIDE, - StorageConstants.DEFAULT_ORC_ROW_INDEX_STRIDE)), - timezone); + writer = OrcFile.createWriter(path, buildWriterOptions(conf, meta, schema), timezone); if (tableStatsEnabled) { this.stats = new TableStatistics(schema, columnStatsEnabled); @@ -90,7 +87,6 @@ public class ORCAppender extends FileAppender { public void close() throws IOException { writer.close(); - // TODO: getOffset is not implemented yet // if (tableStatsEnabled) { // stats.setNumBytes(getOffset()); // } @@ -107,24 +103,81 @@ public class ORCAppender extends FileAppender { @Override public long getEstimatedOutputSize() throws IOException { - return writer.getRawDataSize() * writer.getNumberOfRows(); + return writer.getRawDataSize(); } - private CompressionKind getCompressionKind() { - String kindstr = meta.getProperty(StorageConstants.ORC_COMPRESSION, StorageConstants.DEFAULT_ORC_COMPRESSION_KIND); + private static OrcFile.WriterOptions buildWriterOptions(Configuration conf, TableMeta meta, Schema schema) { + return OrcFile.writerOptions(conf) + .setSchema(OrcUtils.convertSchema(schema)) + .compress(getCompressionKind(meta)) + .stripeSize(Long.parseLong(meta.getProperty(OrcConf.STRIPE_SIZE.getAttribute(), + String.valueOf(OrcConf.STRIPE_SIZE.getDefaultValue())))) + .blockSize(Long.parseLong(meta.getProperty(OrcConf.BLOCK_SIZE.getAttribute(), + String.valueOf(OrcConf.BLOCK_SIZE.getDefaultValue())))) + .rowIndexStride(Integer.parseInt(meta.getProperty(OrcConf.ROW_INDEX_STRIDE.getAttribute(), + String.valueOf(OrcConf.ROW_INDEX_STRIDE.getDefaultValue())))) + .bufferSize(Integer.parseInt(meta.getProperty(OrcConf.BUFFER_SIZE.getAttribute(), + String.valueOf(OrcConf.BUFFER_SIZE.getDefaultValue())))) + .blockPadding(Boolean.parseBoolean(meta.getProperty(OrcConf.BLOCK_PADDING.getAttribute(), + String.valueOf(OrcConf.BLOCK_PADDING.getDefaultValue())))) + .encodingStrategy(EncodingStrategy.valueOf(meta.getProperty(OrcConf.ENCODING_STRATEGY.getAttribute(), + String.valueOf(OrcConf.ENCODING_STRATEGY.getDefaultValue())))) + .bloomFilterFpp(Double.parseDouble(meta.getProperty(OrcConf.BLOOM_FILTER_FPP.getAttribute(), + String.valueOf(OrcConf.BLOOM_FILTER_FPP.getDefaultValue())))) + .bloomFilterColumns(meta.getProperty(OrcConf.BLOOM_FILTER_COLUMNS.getAttribute(), + String.valueOf(OrcConf.BLOOM_FILTER_COLUMNS.getDefaultValue()))); + } + + private static CompressionKind getCompressionKind(TableMeta meta) { + String kindstr = meta.getProperty(OrcConf.COMPRESS.getAttribute(), + String.valueOf(OrcConf.COMPRESS.getDefaultValue())); - if (kindstr.equalsIgnoreCase(StorageConstants.ORC_COMPRESSION_KIND_ZIP)) { + if (kindstr.equalsIgnoreCase(CompressionKind.ZLIB.name())) { return CompressionKind.ZLIB; } - if (kindstr.equalsIgnoreCase(StorageConstants.ORC_COMPRESSION_KIND_SNAPPY)) { + if (kindstr.equalsIgnoreCase(CompressionKind.SNAPPY.name())) { return CompressionKind.SNAPPY; } - if (kindstr.equalsIgnoreCase(StorageConstants.ORC_COMPRESSION_KIND_LZO)) { + if (kindstr.equalsIgnoreCase(CompressionKind.LZO.name())) { return CompressionKind.LZO; } return CompressionKind.NONE; } + + /** + * Options for creating ORC file writers. + */ + public static class WriterOptions extends OrcFile.WriterOptions { + // Setting the default batch size to 1000 makes the memory check at 5000 + // rows work the same as the row by row writer. (If it was the default 1024, + // the smallest stripe size would be 5120 rows, which changes the output + // of some of the tests.) + private int batchSize = 1000; + + public WriterOptions(Properties tableProperties, Configuration conf) { + super(tableProperties, conf); + } + + /** + * Set the schema for the file. This is a required parameter. + * @param schema the schema for the file. + * @return this + */ + public WriterOptions setSchema(TypeDescription schema) { + super.setSchema(schema); + return this; + } + + protected WriterOptions batchSize(int maxSize) { + batchSize = maxSize; + return this; + } + + int getBatchSize() { + return batchSize; + } + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java deleted file mode 100644 index 0a4ebc6..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java +++ /dev/null @@ -1,332 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.orc; - -import com.facebook.presto.orc.OrcDataSource; -import com.facebook.presto.orc.OrcPredicate; -import com.facebook.presto.orc.OrcReader; -import com.facebook.presto.orc.OrcRecordReader; -import com.facebook.presto.orc.memory.AggregatedMemoryContext; -import com.facebook.presto.orc.metadata.OrcMetadataReader; -import com.facebook.presto.spi.block.Block; -import com.facebook.presto.spi.type.*; -import com.google.protobuf.InvalidProtocolBufferException; -import io.airlift.units.DataSize; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.*; -import org.apache.tajo.exception.NotImplementedException; -import org.apache.tajo.exception.TajoRuntimeException; -import org.apache.tajo.plan.expr.EvalNode; -import org.apache.tajo.storage.FileScanner; -import org.apache.tajo.storage.StorageConstants; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.storage.thirdparty.orc.HdfsOrcDataSource; -import org.apache.tajo.util.datetime.DateTimeUtil; -import org.joda.time.DateTimeZone; - -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.TimeZone; - -/** - * OrcScanner for reading ORC files - */ -public class ORCScanner extends FileScanner { - private static final Log LOG = LogFactory.getLog(ORCScanner.class); - private OrcRecordReader recordReader; - private Block[] blocks; - private int currentPosInBatch = 0; - private int batchSize = 0; - private Tuple outTuple; - private AggregatedMemoryContext aggrMemoryContext = new AggregatedMemoryContext(); - - public ORCScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) { - super(conf, schema, meta, fragment); - } - - private FileSystem fs; - private FSDataInputStream fis; - - private static class ColumnInfo { - TajoDataTypes.DataType type; - int id; - } - - /** - * Temporary array for caching column info - */ - private ColumnInfo [] targetColInfo; - - @Override - public void init() throws IOException { - OrcReader orcReader; - DataSize maxMergeDistance = new DataSize(Double.parseDouble(meta.getProperty(StorageConstants.ORC_MAX_MERGE_DISTANCE, - StorageConstants.DEFAULT_ORC_MAX_MERGE_DISTANCE)), DataSize.Unit.BYTE); - DataSize maxReadSize = new DataSize(Double.parseDouble(meta.getProperty(StorageConstants.ORC_MAX_READ_BUFFER_SIZE, - StorageConstants.DEFAULT_ORC_MAX_READ_BUFFER_SIZE)), DataSize.Unit.BYTE); - - if (targets == null) { - targets = schema.toArray(); - } - - outTuple = new VTuple(targets.length); - - Path path = fragment.getPath(); - - if(fs == null) { - fs = FileScanner.getFileSystem((TajoConf)conf, path); - } - - if(fis == null) { - fis = fs.open(path); - } - - OrcDataSource orcDataSource = new HdfsOrcDataSource( - this.fragment.getPath().toString(), - fis, - fs.getFileStatus(path).getLen(), - maxMergeDistance, - maxReadSize); - - targetColInfo = new ColumnInfo[targets.length]; - for (int i=0; i<targets.length; i++) { - ColumnInfo cinfo = new ColumnInfo(); - cinfo.type = targets[i].getDataType(); - cinfo.id = schema.getColumnId(targets[i].getQualifiedName()); - targetColInfo[i] = cinfo; - } - - // creating blocks for buffering - blocks = new Block[targetColInfo.length]; - - Map<Integer, Type> columnMap = new HashMap<>(); - for (ColumnInfo colInfo: targetColInfo) { - columnMap.put(colInfo.id, createFBtypeByTajoType(colInfo.type)); - } - - orcReader = new OrcReader(orcDataSource, new OrcMetadataReader(), maxMergeDistance, maxReadSize); - - TimeZone timezone = TimeZone.getTimeZone(meta.getProperty(StorageConstants.TIMEZONE, - TajoConstants.DEFAULT_SYSTEM_TIMEZONE)); - - // TODO: make OrcPredicate useful - // presto-orc uses joda timezone, so it needs to be converted. - recordReader = orcReader.createRecordReader(columnMap, OrcPredicate.TRUE, - fragment.getStartKey(), fragment.getLength(), DateTimeZone.forTimeZone(timezone), aggrMemoryContext); - - super.init(); - LOG.debug("file fragment { path: " + fragment.getPath() + - ", start offset: " + fragment.getStartKey() + - ", length: " + fragment.getLength() + "}"); - } - - @Override - public Tuple next() throws IOException { - if (currentPosInBatch == batchSize) { - getNextBatch(); - - // EOF - if (batchSize == -1) { - return null; - } - } - - for (int i=0; i<targetColInfo.length; i++) { - outTuple.put(i, createValueDatum(blocks[i], targetColInfo[i].type)); - } - - currentPosInBatch++; - - return outTuple; - } - - private Type createFBtypeByTajoType(TajoDataTypes.DataType type) { - switch(type.getType()) { - case BOOLEAN: - return BooleanType.BOOLEAN; - - case INT1: - case INT2: - case INT4: - case INT8: - case INET4: - case NULL_TYPE: // meaningless - return BigintType.BIGINT; - - case TIMESTAMP: - return TimestampType.TIMESTAMP; - - case DATE: - return DateType.DATE; - - case FLOAT4: - case FLOAT8: - return DoubleType.DOUBLE; - - case CHAR: - case TEXT: - return VarcharType.VARCHAR; - - case BLOB: - case PROTOBUF: - return VarbinaryType.VARBINARY; - - default: - throw new TajoRuntimeException(new NotImplementedException(type.getType().name() + " for orc")); - } - } - - // TODO: support more types - private Datum createValueDatum(Block block, TajoDataTypes.DataType type) { - if (block.isNull(currentPosInBatch)) - return NullDatum.get(); - - // NOTE: block.get***() methods are determined by the type size wich is in createFBtypeByTajoType() - switch (type.getType()) { - case INT1: - return DatumFactory.createInt2((short)block.getLong(currentPosInBatch, 0)); - - case INT2: - return DatumFactory.createInt2((short)block.getLong(currentPosInBatch, 0)); - - case INT4: - return DatumFactory.createInt4((int)block.getLong(currentPosInBatch, 0)); - - case INT8: - return DatumFactory.createInt8(block.getLong(currentPosInBatch, 0)); - - case FLOAT4: - return DatumFactory.createFloat4((float)block.getDouble(currentPosInBatch, 0)); - - case FLOAT8: - return DatumFactory.createFloat8(block.getDouble(currentPosInBatch, 0)); - - case BOOLEAN: - return DatumFactory.createBool(block.getByte(currentPosInBatch, 0) != 0); - - case CHAR: - return DatumFactory.createChar(block.getSlice(currentPosInBatch, 0, - block.getLength(currentPosInBatch)).getBytes()); - - case TEXT: - return DatumFactory.createText(block.getSlice(currentPosInBatch, 0, - block.getLength(currentPosInBatch)).getBytes()); - - case BLOB: - return DatumFactory.createBlob(block.getSlice(currentPosInBatch, 0, - block.getLength(currentPosInBatch)).getBytes()); - - case PROTOBUF: - try { - return ProtobufDatumFactory.createDatum(type, block.getSlice(currentPosInBatch, 0, - block.getLength(currentPosInBatch)).getBytes()); - } catch (InvalidProtocolBufferException e) { - LOG.error("ERROR", e); - return NullDatum.get(); - } - - case TIMESTAMP: - return DatumFactory.createTimestamp( - DateTimeUtil.javaTimeToJulianTime(block.getLong(currentPosInBatch, 0))); - - case DATE: - return DatumFactory.createDate( - block.getInt(currentPosInBatch, 0) + DateTimeUtil.DAYS_FROM_JULIAN_TO_EPOCH); - - case INET4: - return DatumFactory.createInet4((int)block.getLong(currentPosInBatch, 0)); - - case NULL_TYPE: - return NullDatum.get(); - - default: - throw new TajoRuntimeException(new NotImplementedException(type.getType().name() + " for orc")); - } - } - - /** - * Fetch next batch from ORC file and write to block data structure as many as batch size - * - * @throws IOException - */ - private void getNextBatch() throws IOException { - batchSize = recordReader.nextBatch(); - - // end of file - if (batchSize == -1) - return; - - for (int i=0; i<targetColInfo.length; i++) { - blocks[i] = recordReader.readBlock(createFBtypeByTajoType(targetColInfo[i].type), targetColInfo[i].id); - } - - currentPosInBatch = 0; - } - - @Override - public float getProgress() { - if(!inited) return super.getProgress(); - - return recordReader.getProgress(); - } - - @Override - public void reset() throws IOException { - } - - @Override - public void close() throws IOException { - if (recordReader != null) { - recordReader.close(); - } - } - - @Override - public boolean isProjectable() { - return true; - } - - @Override - public boolean isSelectable() { - return false; - } - - @Override - public void setFilter(EvalNode filter) { - // TODO: implement it - } - - @Override - public boolean isSplittable() { - return true; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/OrcScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/OrcScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/OrcScanner.java new file mode 100644 index 0000000..c8aa67b --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/OrcScanner.java @@ -0,0 +1,460 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.orc; + +import com.google.common.collect.Lists; +import com.google.protobuf.CodedInputStream; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.orc.*; +import org.apache.orc.Reader.Options; +import org.apache.orc.impl.BufferChunk; +import org.apache.orc.impl.InStream; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.plan.expr.EvalNode; +import org.apache.tajo.storage.FileScanner; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.thirdparty.orc.OrcRecordReader; +import org.apache.tajo.storage.thirdparty.orc.OrcUtils; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.TimeZone; + +public class OrcScanner extends FileScanner { + private static final Log LOG = LogFactory.getLog(OrcScanner.class); + + private static final int DIRECTORY_SIZE_GUESS = 16 * 1024; + + protected final FileSystem fileSystem; + private final long maxLength = Long.MAX_VALUE; + protected final Path path; + protected org.apache.orc.CompressionKind compressionKind; + protected CompressionCodec codec; + protected int bufferSize; + private List<OrcProto.StripeStatistics> stripeStats; + private int metadataSize; + protected List<OrcProto.Type> types; + private List<OrcProto.UserMetadataItem> userMetadata; + private List<OrcProto.ColumnStatistics> fileStats; + private List<StripeInformation> stripes; + protected int rowIndexStride; + private long contentLength, numberOfRows; + + private List<Integer> versionList; + + //serialized footer - Keeping this around for use by getFileMetaInfo() + // will help avoid cpu cycles spend in deserializing at cost of increased + // memory footprint. + private ByteBuffer footerByteBuffer; + // Same for metastore cache - maintains the same background buffer, but includes postscript. + // This will only be set if the file footer/metadata was read from disk. + private ByteBuffer footerMetaAndPsBuffer; + + private OrcRecordReader recordReader; + + private long recordCount = 0; + + /** + * Ensure this is an ORC file to prevent users from trying to read text + * files or RC files as ORC files. + * @param in the file being read + * @param path the filename for error messages + * @param psLen the postscript length + * @param buffer the tail of the file + * @throws IOException + */ + static void ensureOrcFooter(FSDataInputStream in, + Path path, + int psLen, + ByteBuffer buffer) throws IOException { + int len = OrcFile.MAGIC.length(); + if (psLen < len + 1) { + throw new IOException("Malformed ORC file " + path + + ". Invalid postscript length " + psLen); + } + int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1 - len; + byte[] array = buffer.array(); + // now look for the magic string at the end of the postscript. + if (!Text.decode(array, offset, len).equals(OrcFile.MAGIC)) { + // If it isn't there, this may be the 0.11.0 version of ORC. + // Read the first 3 bytes of the file to check for the header + byte[] header = new byte[len]; + in.readFully(0, header, 0, len); + // if it isn't there, this isn't an ORC file + if (!Text.decode(header, 0 , len).equals(OrcFile.MAGIC)) { + throw new IOException("Malformed ORC file " + path + + ". Invalid postscript."); + } + } + } + + /** + * Build a version string out of an array. + * @param version the version number as a list + * @return the human readable form of the version string + */ + private static String versionString(List<Integer> version) { + StringBuilder buffer = new StringBuilder(); + for(int i=0; i < version.size(); ++i) { + if (i != 0) { + buffer.append('.'); + } + buffer.append(version.get(i)); + } + return buffer.toString(); + } + + /** + * Check to see if this ORC file is from a future version and if so, + * warn the user that we may not be able to read all of the column encodings. + * @param log the logger to write any error message to + * @param path the data source path for error messages + * @param version the version of hive that wrote the file. + */ + static void checkOrcVersion(Log log, Path path, List<Integer> version) { + if (version.size() >= 1) { + int major = version.get(0); + int minor = 0; + if (version.size() >= 2) { + minor = version.get(1); + } + if (major > OrcFile.Version.CURRENT.getMajor() || + (major == OrcFile.Version.CURRENT.getMajor() && + minor > OrcFile.Version.CURRENT.getMinor())) { + log.warn(path + " was written by a future Hive version " + + versionString(version) + + ". This file may not be readable by this version of Hive."); + } + } + } + + public OrcScanner(Configuration conf, Schema schema, TableMeta meta, Fragment fragment) throws IOException { + super(conf, schema, meta, fragment); + + this.path = this.fragment.getPath(); + this.fileSystem = this.path.getFileSystem(conf); + } + + private static FileMetaInfo extractMetaInfoFromFooter(FileSystem fs, + Path path, + long maxFileLength + ) throws IOException { + FSDataInputStream file = fs.open(path); + + // figure out the size of the file using the option or filesystem + long size; + if (maxFileLength == Long.MAX_VALUE) { + size = fs.getFileStatus(path).getLen(); + } else { + size = maxFileLength; + } + + //read last bytes into buffer to get PostScript + int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS); + ByteBuffer buffer = ByteBuffer.allocate(readSize); + assert buffer.position() == 0; + file.readFully((size - readSize), + buffer.array(), buffer.arrayOffset(), readSize); + buffer.position(0); + + //read the PostScript + //get length of PostScript + int psLen = buffer.get(readSize - 1) & 0xff; + ensureOrcFooter(file, path, psLen, buffer); + int psOffset = readSize - 1 - psLen; + OrcProto.PostScript ps = extractPostScript(buffer, path, psLen, psOffset); + + int footerSize = (int) ps.getFooterLength(); + int metadataSize = (int) ps.getMetadataLength(); + + //check if extra bytes need to be read + ByteBuffer fullFooterBuffer = null; + int extra = Math.max(0, psLen + 1 + footerSize + metadataSize - readSize); + if (extra > 0) { + //more bytes need to be read, seek back to the right place and read extra bytes + ByteBuffer extraBuf = ByteBuffer.allocate(extra + readSize); + file.readFully((size - readSize - extra), extraBuf.array(), + extraBuf.arrayOffset() + extraBuf.position(), extra); + extraBuf.position(extra); + //append with already read bytes + extraBuf.put(buffer); + buffer = extraBuf; + buffer.position(0); + fullFooterBuffer = buffer.slice(); + buffer.limit(footerSize + metadataSize); + } else { + //footer is already in the bytes in buffer, just adjust position, length + buffer.position(psOffset - footerSize - metadataSize); + fullFooterBuffer = buffer.slice(); + buffer.limit(psOffset); + } + + // remember position for later + buffer.mark(); + + file.close(); + + return new FileMetaInfo( + ps.getCompression().toString(), + (int) ps.getCompressionBlockSize(), + (int) ps.getMetadataLength(), + buffer, + ps.getVersionList(), + org.apache.orc.OrcFile.WriterVersion.FUTURE, + fullFooterBuffer + ); + } + + public OrcRecordReader createRecordReader() throws IOException { + return new OrcRecordReader(this.stripes, fileSystem, schema, targets, fragment, types, codec, bufferSize, + rowIndexStride, buildReaderOptions(meta), conf, + TimeZone.getTimeZone(meta.getProperty(StorageConstants.TIMEZONE, TajoConstants.DEFAULT_SYSTEM_TIMEZONE))); + } + + private static Options buildReaderOptions(TableMeta meta) { + return new Options() + .useZeroCopy(Boolean.parseBoolean(meta.getProperty(OrcConf.USE_ZEROCOPY.getAttribute(), + String.valueOf(OrcConf.USE_ZEROCOPY.getDefaultValue())))) + .skipCorruptRecords(Boolean.parseBoolean(meta.getProperty(OrcConf.SKIP_CORRUPT_DATA.getAttribute(), + String.valueOf(OrcConf.SKIP_CORRUPT_DATA.getDefaultValue())))); + } + + @Override + public void init() throws IOException { + FileMetaInfo footerMetaData = extractMetaInfoFromFooter(fileSystem, path, maxLength); + this.footerMetaAndPsBuffer = footerMetaData.footerMetaAndPsBuffer; + MetaInfoObjExtractor rInfo = + new MetaInfoObjExtractor(footerMetaData.compressionType, + footerMetaData.bufferSize, + footerMetaData.metadataSize, + footerMetaData.footerBuffer + ); + this.footerByteBuffer = footerMetaData.footerBuffer; + this.compressionKind = rInfo.compressionKind; + this.codec = rInfo.codec; + this.bufferSize = rInfo.bufferSize; + this.metadataSize = rInfo.metadataSize; + this.stripeStats = rInfo.metadata.getStripeStatsList(); + this.types = rInfo.footer.getTypesList(); + this.rowIndexStride = rInfo.footer.getRowIndexStride(); + this.contentLength = rInfo.footer.getContentLength(); + this.numberOfRows = rInfo.footer.getNumberOfRows(); + this.userMetadata = rInfo.footer.getMetadataList(); + this.fileStats = rInfo.footer.getStatisticsList(); + this.versionList = footerMetaData.versionList; + this.stripes = convertProtoStripesToStripes(rInfo.footer.getStripesList()); + + recordReader = createRecordReader(); + + super.init(); + } + + @Override + public Tuple next() throws IOException { + Tuple next = recordReader.next(); + if (next != null) { + recordCount++; + } + return next; + } + + @Override + public void reset() throws IOException { + // TODO: improve this + this.close(); + recordReader = createRecordReader(); + } + + @Override + public void close() throws IOException { + if (recordReader != null) { + recordReader.close(); + tableStats.setNumBytes(recordReader.getNumBytes()); + tableStats.setNumRows(recordCount); + } + } + + @Override + public boolean isProjectable() { + return true; + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public void setFilter(EvalNode filter) { + // TODO: implement this + } + + @Override + public float getProgress() { + return inited ? recordReader.getProgress() : super.getProgress(); + } + + @Override + public boolean isSplittable() { + return true; + } + + private static OrcProto.PostScript extractPostScript(ByteBuffer bb, Path path, + int psLen, int psAbsOffset) throws IOException { + // TODO: when PB is upgraded to 2.6, newInstance(ByteBuffer) method should be used here. + assert bb.hasArray(); + CodedInputStream in = CodedInputStream.newInstance( + bb.array(), bb.arrayOffset() + psAbsOffset, psLen); + OrcProto.PostScript ps = OrcProto.PostScript.parseFrom(in); + checkOrcVersion(LOG, path, ps.getVersionList()); + + // Check compression codec. + switch (ps.getCompression()) { + case NONE: + break; + case ZLIB: + break; + case SNAPPY: + break; + case LZO: + break; + default: + throw new IllegalArgumentException("Unknown compression"); + } + return ps; + } + + private static OrcProto.Footer extractFooter(ByteBuffer bb, int footerAbsPos, + int footerSize, CompressionCodec codec, int bufferSize) throws IOException { + bb.position(footerAbsPos); + bb.limit(footerAbsPos + footerSize); + return OrcProto.Footer.parseFrom(InStream.createCodedInputStream("footer", + Lists.newArrayList(new BufferChunk(bb, 0)), footerSize, codec, bufferSize)); + } + + private static OrcProto.Metadata extractMetadata(ByteBuffer bb, int metadataAbsPos, + int metadataSize, CompressionCodec codec, int bufferSize) throws IOException { + bb.position(metadataAbsPos); + bb.limit(metadataAbsPos + metadataSize); + return OrcProto.Metadata.parseFrom(InStream.createCodedInputStream("metadata", + Lists.newArrayList(new BufferChunk(bb, 0)), metadataSize, codec, bufferSize)); + } + + /** + * MetaInfoObjExtractor - has logic to create the values for the fields in ReaderImpl + * from serialized fields. + * As the fields are final, the fields need to be initialized in the constructor and + * can't be done in some helper function. So this helper class is used instead. + * + */ + private static class MetaInfoObjExtractor{ + final org.apache.orc.CompressionKind compressionKind; + final CompressionCodec codec; + final int bufferSize; + final int metadataSize; + final OrcProto.Metadata metadata; + final OrcProto.Footer footer; + + MetaInfoObjExtractor(String codecStr, int bufferSize, int metadataSize, + ByteBuffer footerBuffer) throws IOException { + + this.compressionKind = org.apache.orc.CompressionKind.valueOf(codecStr); + this.bufferSize = bufferSize; + this.codec = OrcUtils.createCodec(compressionKind); + this.metadataSize = metadataSize; + + int position = footerBuffer.position(); + int footerBufferSize = footerBuffer.limit() - footerBuffer.position() - metadataSize; + + this.metadata = extractMetadata(footerBuffer, position, metadataSize, codec, bufferSize); + this.footer = extractFooter( + footerBuffer, position + metadataSize, footerBufferSize, codec, bufferSize); + + footerBuffer.position(position); + } + } + + public static class StripeInformationImpl + implements org.apache.orc.StripeInformation { + private final OrcProto.StripeInformation stripe; + + public StripeInformationImpl(OrcProto.StripeInformation stripe) { + this.stripe = stripe; + } + + @Override + public long getOffset() { + return stripe.getOffset(); + } + + @Override + public long getLength() { + return stripe.getDataLength() + getIndexLength() + getFooterLength(); + } + + @Override + public long getDataLength() { + return stripe.getDataLength(); + } + + @Override + public long getFooterLength() { + return stripe.getFooterLength(); + } + + @Override + public long getIndexLength() { + return stripe.getIndexLength(); + } + + @Override + public long getNumberOfRows() { + return stripe.getNumberOfRows(); + } + + @Override + public String toString() { + return "offset: " + getOffset() + " data: " + getDataLength() + + " rows: " + getNumberOfRows() + " tail: " + getFooterLength() + + " index: " + getIndexLength(); + } + } + + private static List<StripeInformation> convertProtoStripesToStripes( + List<OrcProto.StripeInformation> stripes) { + List<StripeInformation> result = new ArrayList<>(stripes.size()); + for (OrcProto.StripeInformation info : stripes) { + result.add(new StripeInformationImpl(info)); + } + return result; + } + +} http://git-wip-us.apache.org/repos/asf/tajo/blob/68263585/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/ObjectInspectorFactory.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/ObjectInspectorFactory.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/ObjectInspectorFactory.java deleted file mode 100644 index 061ba0d..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/objectinspector/ObjectInspectorFactory.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.orc.objectinspector; - -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.exception.UnsupportedException; - -public class ObjectInspectorFactory { - - public static StructObjectInspector buildStructObjectInspector(Schema schema) { - StructObjectInspector structOI = new TajoStructObjectInspector(schema); - return structOI; - } - - public static ObjectInspector buildObjectInspectorByType(TajoDataTypes.Type dataType) throws UnsupportedException { - ObjectInspector oi = null; - - switch(dataType) { - case BOOLEAN: - oi = new TajoBooleanObjectInspector(); - break; - - case INT2: - oi = new TajoShortObjectInspector(); - break; - - case INET4: - case INT4: - oi = new TajoIntObjectInspector(); - break; - - case INT8: - oi = new TajoLongObjectInspector(); - break; - - case FLOAT4: - oi = new TajoFloatObjectInspector(); - break; - - case FLOAT8: - oi = new TajoDoubleObjectInspector(); - break; - - case TEXT: - case CHAR: - oi = new TajoStringObjectInspector(); - break; - - case TIMESTAMP: - oi = new TajoTimestampObjectInspector(); - break; - - case DATE: - oi = new TajoDateObjectInspector(); - break; - - case BLOB: - case PROTOBUF: - oi = new TajoBlobObjectInspector(); - break; - - case NULL_TYPE: - oi = new TajoNullObjectInspector(); - break; - - default: - throw new UnsupportedException(dataType.name()+" is not supported yet in OrcAppender"); - } - - return oi; - } -}
