Repository: tajo Updated Branches: refs/heads/master a3a178a93 -> dc40d849e
TAJO-714: Enable setting Parquet tuning parameters. (David Chen via hyunsik) Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/dc40d849 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/dc40d849 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/dc40d849 Branch: refs/heads/master Commit: dc40d849e4f52b93753627ae699e4c161e4ebdf1 Parents: a3a178a Author: Hyunsik Choi <[email protected]> Authored: Wed Apr 2 11:54:05 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Wed Apr 2 11:54:05 2014 +0900 ---------------------------------------------------------------------- CHANGES.txt | 2 ++ pom.xml | 2 ++ tajo-catalog/tajo-catalog-common/pom.xml | 16 +++++++++++ .../apache/tajo/catalog/CatalogConstants.java | 29 ++++++++++++++++++++ .../org/apache/tajo/catalog/CatalogUtil.java | 13 +++++++++ tajo-storage/pom.xml | 3 +- .../tajo/storage/parquet/ParquetAppender.java | 27 +++++++++++++++++- .../tajo/storage/parquet/ParquetScanner.java | 1 + .../apache/tajo/storage/TestMergeScanner.java | 1 + .../org/apache/tajo/storage/TestStorages.java | 3 ++ .../apache/tajo/storage/v2/TestStorages.java | 3 ++ 11 files changed, 97 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 83990ff..ffeceb0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -143,6 +143,8 @@ Release 0.8.0 - unreleased IMPROVEMENTS + TAJO-714: Enable setting Parquet tuning parameters. (David Chen via hyunsik) + TAJO-691: HashJoin or HashAggregation is too slow if there is many unique keys. (hyoungjunkim via hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 0421119..d09559e 100644 --- a/pom.xml +++ b/pom.xml @@ -75,6 +75,8 @@ <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <tajo.version>0.8.0-SNAPSHOT</tajo.version> <tajo.root>${basedir}</tajo.root> + <parquet.version>1.3.2</parquet.version> + <parquet.format.version>2.0.0</parquet.format.version> </properties> <modules> http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-catalog/tajo-catalog-common/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/pom.xml b/tajo-catalog/tajo-catalog-common/pom.xml index a4db647..fa90237 100644 --- a/tajo-catalog/tajo-catalog-common/pom.xml +++ b/tajo-catalog/tajo-catalog-common/pom.xml @@ -160,6 +160,22 @@ <groupId>commons-logging</groupId> <artifactId>commons-logging-api</artifactId> </dependency> + + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-column</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-format</artifactId> + <version>${parquet.format.version}</version> + </dependency> </dependencies> <profiles> http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java index c5e0dd4..c695fc8 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogConstants.java @@ -18,6 +18,9 @@ package org.apache.tajo.catalog; +import parquet.hadoop.ParquetWriter; +import parquet.hadoop.metadata.CompressionCodecName; + public class CatalogConstants { public static final String STORE_CLASS="tajo.catalog.store.class"; @@ -64,4 +67,30 @@ public class CatalogConstants { public static final String DEFAULT_FIELD_DELIMITER = "|"; public static final String DEFAULT_BINARY_SERDE = "org.apache.tajo.storage.BinarySerializerDeserializer"; public static final String DEFAULT_TEXT_SERDE = "org.apache.tajo.storage.TextSerializerDeserializer"; + + public static final String PARQUET_DEFAULT_BLOCK_SIZE; + public static final String PARQUET_DEFAULT_PAGE_SIZE; + public static final String PARQUET_DEFAULT_COMPRESSION_CODEC_NAME; + public static final String PARQUET_DEFAULT_IS_DICTIONARY_ENABLED; + public static final String PARQUET_DEFAULT_IS_VALIDATION_ENABLED; + + static { + PARQUET_DEFAULT_BLOCK_SIZE = + Integer.toString(ParquetWriter.DEFAULT_BLOCK_SIZE); + PARQUET_DEFAULT_PAGE_SIZE = + Integer.toString(ParquetWriter.DEFAULT_PAGE_SIZE); + + // When parquet-hadoop 1.3.3 is available, this should be changed to + // ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME. + PARQUET_DEFAULT_COMPRESSION_CODEC_NAME = + CompressionCodecName.UNCOMPRESSED.name().toLowerCase(); + + // When parquet-hadoop 1.3.3 is available, this should be changed to + // ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED. + PARQUET_DEFAULT_IS_DICTIONARY_ENABLED = "true"; + + // When parquet-hadoop 1.3.3 is available, this should be changed to + // ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED. + PARQUET_DEFAULT_IS_VALIDATION_ENABLED = "false"; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index 6e7d2a5..f9f92f0 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@ -36,6 +36,8 @@ import java.util.Collection; import java.util.HashSet; import java.util.Set; +import parquet.hadoop.ParquetOutputFormat; + import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import static org.apache.tajo.common.TajoDataTypes.Type; @@ -296,6 +298,17 @@ public class CatalogUtil { } else if(StoreType.SEQUENCEFILE == type){ options.put(CatalogConstants.SEQUENCEFILE_SERDE, CatalogConstants.DEFAULT_TEXT_SERDE); options.put(CatalogConstants.SEQUENCEFILE_DELIMITER, CatalogConstants.DEFAULT_FIELD_DELIMITER); + } else if (type == StoreType.PARQUET) { + options.put(ParquetOutputFormat.BLOCK_SIZE, + CatalogConstants.PARQUET_DEFAULT_BLOCK_SIZE); + options.put(ParquetOutputFormat.PAGE_SIZE, + CatalogConstants.PARQUET_DEFAULT_PAGE_SIZE); + options.put(ParquetOutputFormat.COMPRESSION, + CatalogConstants.PARQUET_DEFAULT_COMPRESSION_CODEC_NAME); + options.put(ParquetOutputFormat.ENABLE_DICTIONARY, + CatalogConstants.PARQUET_DEFAULT_IS_DICTIONARY_ENABLED); + options.put(ParquetOutputFormat.VALIDATION, + CatalogConstants.PARQUET_DEFAULT_IS_VALIDATION_ENABLED); } return options; http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-storage/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/pom.xml b/tajo-storage/pom.xml index c3f08cb..b9a162a 100644 --- a/tajo-storage/pom.xml +++ b/tajo-storage/pom.xml @@ -35,8 +35,6 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> - <parquet.version>1.3.2</parquet.version> - <parquet.format.version>2.0.0</parquet.format.version> </properties> <repositories> @@ -265,6 +263,7 @@ <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> + <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-column</artifactId> http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java index cb2f243..10b9331 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java @@ -18,6 +18,9 @@ package org.apache.tajo.storage.parquet; +import parquet.hadoop.ParquetOutputFormat; +import parquet.hadoop.metadata.CompressionCodecName; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Schema; @@ -34,6 +37,11 @@ import java.io.IOException; */ public class ParquetAppender extends FileAppender { private TajoParquetWriter writer; + private int blockSize; + private int pageSize; + private CompressionCodecName compressionCodecName; + private boolean enableDictionary; + private boolean validating; private TableStatistics stats; /** @@ -47,6 +55,16 @@ public class ParquetAppender extends FileAppender { public ParquetAppender(Configuration conf, Schema schema, TableMeta meta, Path path) throws IOException { super(conf, schema, meta, path); + this.blockSize = Integer.parseInt( + meta.getOption(ParquetOutputFormat.BLOCK_SIZE)); + this.pageSize = Integer.parseInt( + meta.getOption(ParquetOutputFormat.PAGE_SIZE)); + this.compressionCodecName = CompressionCodecName.fromConf( + meta.getOption(ParquetOutputFormat.COMPRESSION)); + this.enableDictionary = Boolean.parseBoolean( + meta.getOption(ParquetOutputFormat.ENABLE_DICTIONARY)); + this.validating = Boolean.parseBoolean( + meta.getOption(ParquetOutputFormat.VALIDATION)); } /** @@ -54,10 +72,17 @@ public class ParquetAppender extends FileAppender { * and initializes the table statistics if enabled. */ public void init() throws IOException { - writer = new TajoParquetWriter(path, schema); + writer = new TajoParquetWriter(path, + schema, + compressionCodecName, + blockSize, + pageSize, + enableDictionary, + validating); if (enabledStats) { this.stats = new TableStatistics(schema); } + super.init(); } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java index 086f490..38d8ca4 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java @@ -57,6 +57,7 @@ public class ParquetScanner extends FileScanner { } reader = new TajoParquetReader(fragment.getPath(), schema, new Schema(targets)); + super.init(); } /** http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java index 5427592..354fbc2 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestMergeScanner.java @@ -94,6 +94,7 @@ public class TestMergeScanner { Options options = new Options(); TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.setOptions(CatalogUtil.newOptionsWithDefault(storeType)); Path table1Path = new Path(testDir, storeType + "_1.data"); Appender appender1 = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, table1Path); http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java index 0a38985..a500f09 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -146,6 +146,7 @@ public class TestStorages { schema.addColumn("score", Type.FLOAT4); TableMeta meta = CatalogUtil.newTableMeta(storeType); + meta.setOptions(CatalogUtil.newOptionsWithDefault(storeType)); Path tablePath = new Path(testDir, "testProjection.data"); Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); @@ -208,6 +209,7 @@ public class TestStorages { Options options = new Options(); TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.setOptions(CatalogUtil.newOptionsWithDefault(storeType)); Path tablePath = new Path(testDir, "testVariousTypes.data"); Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); @@ -269,6 +271,7 @@ public class TestStorages { Options options = new Options(); TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.setOptions(CatalogUtil.newOptionsWithDefault(storeType)); meta.putOption(CatalogConstants.CSVFILE_NULL, "\\\\N"); meta.putOption(CatalogConstants.RCFILE_NULL, "\\\\N"); meta.putOption(CatalogConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName()); http://git-wip-us.apache.org/repos/asf/tajo/blob/dc40d849/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java index ad49b36..140aa09 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/v2/TestStorages.java @@ -95,6 +95,7 @@ public class TestStorages { schema.addColumn("age", Type.INT8); TableMeta meta = CatalogUtil.newTableMeta(storeType); + meta.setOptions(CatalogUtil.newOptionsWithDefault(storeType)); Path tablePath = new Path(testDir, "Splitable.data"); Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); appender.enableStats(); @@ -147,6 +148,7 @@ public class TestStorages { schema.addColumn("score", Type.FLOAT4); TableMeta meta = CatalogUtil.newTableMeta(storeType); + meta.setOptions(CatalogUtil.newOptionsWithDefault(storeType)); Path tablePath = new Path(testDir, "testProjection.data"); Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath); @@ -207,6 +209,7 @@ public class TestStorages { Options options = new Options(); TableMeta meta = CatalogUtil.newTableMeta(storeType, options); + meta.setOptions(CatalogUtil.newOptionsWithDefault(storeType)); Path tablePath = new Path(testDir, "testVariousTypes.data"); Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
