Repository: tajo Updated Branches: refs/heads/master ea1818ab9 -> ef43dfaa5
http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java deleted file mode 100644 index 5843c2d..0000000 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.thirdparty.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import parquet.column.ParquetProperties; -import parquet.hadoop.api.WriteSupport; -import parquet.hadoop.metadata.CompressionCodecName; -import parquet.schema.MessageType; - -import java.io.Closeable; -import java.io.IOException; - -public class ParquetWriter<T> implements Closeable { - - public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024; - public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024; - public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME = - CompressionCodecName.UNCOMPRESSED; - public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true; - public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false; - public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION = - ParquetProperties.WriterVersion.PARQUET_1_0; - - private final InternalParquetRecordWriter<T> writer; - - /** - * Create a new ParquetWriter. - * (with dictionary encoding enabled and validation off) - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @param compressionCodecName the compression codec to use - * @param blockSize the block size threshold - * @param pageSize the page size threshold - * @throws java.io.IOException - * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, boolean, boolean) - */ - public ParquetWriter(Path file, WriteSupport<T> writeSupport, CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws IOException { - this(file, writeSupport, compressionCodecName, blockSize, pageSize, - DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED); - } - - /** - * Create a new ParquetWriter. - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @param compressionCodecName the compression codec to use - * @param blockSize the block size threshold - * @param pageSize the page size threshold (both data and dictionary) - * @param enableDictionary to turn dictionary encoding on - * @param validating to turn on validation using the schema - * @throws java.io.IOException - * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean) - */ - public ParquetWriter( - Path file, - WriteSupport<T> writeSupport, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - boolean enableDictionary, - boolean validating) throws IOException { - this(file, writeSupport, compressionCodecName, blockSize, pageSize, pageSize, enableDictionary, validating); - } - - /** - * Create a new ParquetWriter. - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @param compressionCodecName the compression codec to use - * @param blockSize the block size threshold - * @param pageSize the page size threshold - * @param dictionaryPageSize the page size threshold for the dictionary pages - * @param enableDictionary to turn dictionary encoding on - * @param validating to turn on validation using the schema - * @throws java.io.IOException - * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion) - */ - public ParquetWriter( - Path file, - WriteSupport<T> writeSupport, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - int dictionaryPageSize, - boolean enableDictionary, - boolean validating) throws IOException { - this(file, writeSupport, compressionCodecName, blockSize, pageSize, - dictionaryPageSize, enableDictionary, validating, - DEFAULT_WRITER_VERSION); - } - - /** - * Create a new ParquetWriter. - * - * Directly instantiates a Hadoop {@link org.apache.hadoop.conf.Configuration} which reads - * configuration from the classpath. - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @param compressionCodecName the compression codec to use - * @param blockSize the block size threshold - * @param pageSize the page size threshold - * @param dictionaryPageSize the page size threshold for the dictionary pages - * @param enableDictionary to turn dictionary encoding on - * @param validating to turn on validation using the schema - * @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion} - * @throws java.io.IOException - * @see #ParquetWriter(org.apache.hadoop.fs.Path, parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion, org.apache.hadoop.conf.Configuration) - */ - public ParquetWriter( - Path file, - WriteSupport<T> writeSupport, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - int dictionaryPageSize, - boolean enableDictionary, - boolean validating, - ParquetProperties.WriterVersion writerVersion) throws IOException { - this(file, writeSupport, compressionCodecName, blockSize, pageSize, dictionaryPageSize, enableDictionary, validating, writerVersion, new Configuration()); - } - - /** - * Create a new ParquetWriter. - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @param compressionCodecName the compression codec to use - * @param blockSize the block size threshold - * @param pageSize the page size threshold - * @param dictionaryPageSize the page size threshold for the dictionary pages - * @param enableDictionary to turn dictionary encoding on - * @param validating to turn on validation using the schema - * @param writerVersion version of parquetWriter from {@link parquet.column.ParquetProperties.WriterVersion} - * @param conf Hadoop configuration to use while accessing the filesystem - * @throws java.io.IOException - */ - public ParquetWriter( - Path file, - WriteSupport<T> writeSupport, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - int dictionaryPageSize, - boolean enableDictionary, - boolean validating, - ParquetProperties.WriterVersion writerVersion, - Configuration conf) throws IOException { - - WriteSupport.WriteContext writeContext = writeSupport.init(conf); - MessageType schema = writeContext.getSchema(); - - ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file); - fileWriter.start(); - - CodecFactory codecFactory = new CodecFactory(conf); - CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compressionCodecName, 0); - this.writer = new InternalParquetRecordWriter<>( - fileWriter, - writeSupport, - schema, - writeContext.getExtraMetaData(), - blockSize, - pageSize, - compressor, - dictionaryPageSize, - enableDictionary, - validating, - writerVersion); - } - - /** - * Create a new ParquetWriter. The default block size is 50 MB.The default - * page size is 1 MB. Default compression is no compression. Dictionary encoding is disabled. - * - * @param file the file to create - * @param writeSupport the implementation to write a record to a RecordConsumer - * @throws java.io.IOException - */ - public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws IOException { - this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE); - } - - public void write(T object) throws IOException { - try { - writer.write(object); - } catch (InterruptedException e) { - throw new IOException(e); - } - } - - public long getEstimatedWrittenSize() throws IOException { - return this.writer.getEstimatedWrittenSize(); - } - - @Override - public void close() throws IOException { - try { - writer.close(); - } catch (InterruptedException e) { - throw new IOException(e); - } - }} http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java index a08dfb9..44d8fdc 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java @@ -98,6 +98,14 @@ public class TestStorages { " ]\n" + "}\n"; + private static String TEST_EMPTY_FILED_AVRO_SCHEMA = + "{\n" + + " \"type\": \"record\",\n" + + " \"namespace\": \"org.apache.tajo\",\n" + + " \"name\": \"testEmptySchema\",\n" + + " \"fields\": []\n" + + "}\n"; + private static String TEST_MAX_VALUE_AVRO_SCHEMA = "{\n" + " \"type\": \"record\",\n" + @@ -1123,7 +1131,6 @@ public class TestStorages { public void testLessThanSchemaSize() throws IOException { /* Internal storage must be same with schema size */ if (internalType || dataFormat.equalsIgnoreCase(BuiltinStorages.AVRO) - || dataFormat.equalsIgnoreCase(BuiltinStorages.PARQUET) || dataFormat.equalsIgnoreCase(BuiltinStorages.ORC)) { return; } @@ -1356,4 +1363,59 @@ public class TestStorages { scanner.close(); assertEquals(1.0f, scanner.getProgress(), 0.0f); } + + @Test + public void testEmptySchema() throws IOException { + if (internalType) return; + + Schema schema = new Schema(); + schema.addColumn("id", Type.INT4); + schema.addColumn("age", Type.INT8); + schema.addColumn("score", Type.FLOAT4); + + TableMeta meta = CatalogUtil.newTableMeta(dataFormat); + meta.setPropertySet(CatalogUtil.newDefaultProperty(dataFormat)); + if (dataFormat.equalsIgnoreCase(BuiltinStorages.AVRO)) { + meta.putProperty(StorageConstants.AVRO_SCHEMA_LITERAL, + TEST_PROJECTION_AVRO_SCHEMA); + } + + Path tablePath = new Path(testDir, "testEmptySchema.data"); + FileTablespace sm = TablespaceManager.getLocalFs(); + Appender appender = sm.getAppender(meta, schema, tablePath); + appender.init(); + + + Tuple expect = new VTuple(schema.size()); + expect.put(new Datum[]{ + DatumFactory.createInt4(Integer.MAX_VALUE), + DatumFactory.createInt8(Long.MAX_VALUE), + DatumFactory.createFloat4(Float.MAX_VALUE) + }); + + appender.addTuple(expect); + appender.flush(); + appender.close(); + + assertTrue(fs.exists(tablePath)); + FileStatus status = fs.getFileStatus(tablePath); + + if (dataFormat.equalsIgnoreCase(BuiltinStorages.AVRO)) { + meta.putProperty(StorageConstants.AVRO_SCHEMA_LITERAL, + TEST_EMPTY_FILED_AVRO_SCHEMA); + } + + //e,g select count(*) from table + Schema target = new Schema(); + assertEquals(0, target.size()); + + FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen()); + Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, target); + scanner.init(); + + Tuple tuple = scanner.next(); + assertNotNull(tuple); + assertEquals(0, tuple.size()); + scanner.close(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java index 35b9f07..ea223e7 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.storage.Tuple; @@ -90,7 +91,7 @@ public class TestReadWrite { writer.write(tuple); writer.close(); - TajoParquetReader reader = new TajoParquetReader(file, schema); + TajoParquetReader reader = new TajoParquetReader(new TajoConf(), file, schema, schema); tuple = reader.read(); assertNotNull(tuple); http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java index 07451d5..ba3f72e 100644 --- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java +++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java @@ -18,12 +18,12 @@ package org.apache.tajo.storage.parquet; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.MessageTypeParser; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.common.TajoDataTypes.Type; import org.junit.Test; -import parquet.schema.MessageType; -import parquet.schema.MessageTypeParser; import java.util.ArrayList; import java.util.List;
