Repository: tajo
Updated Branches:
  refs/heads/master ea1818ab9 -> ef43dfaa5


http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
 
b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
deleted file mode 100644
index 5843c2d..0000000
--- 
a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tajo.storage.thirdparty.parquet;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import parquet.column.ParquetProperties;
-import parquet.hadoop.api.WriteSupport;
-import parquet.hadoop.metadata.CompressionCodecName;
-import parquet.schema.MessageType;
-
-import java.io.Closeable;
-import java.io.IOException;
-
-public class ParquetWriter<T> implements Closeable {
-
-  public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
-  public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
-  public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME =
-      CompressionCodecName.UNCOMPRESSED;
-  public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
-  public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
-  public static final ParquetProperties.WriterVersion DEFAULT_WRITER_VERSION =
-      ParquetProperties.WriterVersion.PARQUET_1_0;
-
-  private final InternalParquetRecordWriter<T> writer;
-
-  /**
-   * Create a new ParquetWriter.
-   * (with dictionary encoding enabled and validation off)
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a 
RecordConsumer
-   * @param compressionCodecName the compression codec to use
-   * @param blockSize the block size threshold
-   * @param pageSize the page size threshold
-   * @throws java.io.IOException
-   * @see #ParquetWriter(org.apache.hadoop.fs.Path, 
parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, 
int, int, boolean, boolean)
-   */
-  public ParquetWriter(Path file, WriteSupport<T> writeSupport, 
CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws 
IOException {
-    this(file, writeSupport, compressionCodecName, blockSize, pageSize,
-        DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED);
-  }
-
-  /**
-   * Create a new ParquetWriter.
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a 
RecordConsumer
-   * @param compressionCodecName the compression codec to use
-   * @param blockSize the block size threshold
-   * @param pageSize the page size threshold (both data and dictionary)
-   * @param enableDictionary to turn dictionary encoding on
-   * @param validating to turn on validation using the schema
-   * @throws java.io.IOException
-   * @see #ParquetWriter(org.apache.hadoop.fs.Path, 
parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, 
int, int, int, boolean, boolean)
-   */
-  public ParquetWriter(
-      Path file,
-      WriteSupport<T> writeSupport,
-      CompressionCodecName compressionCodecName,
-      int blockSize,
-      int pageSize,
-      boolean enableDictionary,
-      boolean validating) throws IOException {
-    this(file, writeSupport, compressionCodecName, blockSize, pageSize, 
pageSize, enableDictionary, validating);
-  }
-
-  /**
-   * Create a new ParquetWriter.
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a 
RecordConsumer
-   * @param compressionCodecName the compression codec to use
-   * @param blockSize the block size threshold
-   * @param pageSize the page size threshold
-   * @param dictionaryPageSize the page size threshold for the dictionary pages
-   * @param enableDictionary to turn dictionary encoding on
-   * @param validating to turn on validation using the schema
-   * @throws java.io.IOException
-   * @see #ParquetWriter(org.apache.hadoop.fs.Path, 
parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, 
int, int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion)
-   */
-  public ParquetWriter(
-      Path file,
-      WriteSupport<T> writeSupport,
-      CompressionCodecName compressionCodecName,
-      int blockSize,
-      int pageSize,
-      int dictionaryPageSize,
-      boolean enableDictionary,
-      boolean validating) throws IOException {
-    this(file, writeSupport, compressionCodecName, blockSize, pageSize,
-        dictionaryPageSize, enableDictionary, validating,
-        DEFAULT_WRITER_VERSION);
-  }
-
-  /**
-   * Create a new ParquetWriter.
-   *
-   * Directly instantiates a Hadoop {@link 
org.apache.hadoop.conf.Configuration} which reads
-   * configuration from the classpath.
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a 
RecordConsumer
-   * @param compressionCodecName the compression codec to use
-   * @param blockSize the block size threshold
-   * @param pageSize the page size threshold
-   * @param dictionaryPageSize the page size threshold for the dictionary pages
-   * @param enableDictionary to turn dictionary encoding on
-   * @param validating to turn on validation using the schema
-   * @param writerVersion version of parquetWriter from {@link 
parquet.column.ParquetProperties.WriterVersion}
-   * @throws java.io.IOException
-   * @see #ParquetWriter(org.apache.hadoop.fs.Path, 
parquet.hadoop.api.WriteSupport, parquet.hadoop.metadata.CompressionCodecName, 
int, int, int, boolean, boolean, 
parquet.column.ParquetProperties.WriterVersion, 
org.apache.hadoop.conf.Configuration)
-   */
-  public ParquetWriter(
-      Path file,
-      WriteSupport<T> writeSupport,
-      CompressionCodecName compressionCodecName,
-      int blockSize,
-      int pageSize,
-      int dictionaryPageSize,
-      boolean enableDictionary,
-      boolean validating,
-      ParquetProperties.WriterVersion writerVersion) throws IOException {
-    this(file, writeSupport, compressionCodecName, blockSize, pageSize, 
dictionaryPageSize, enableDictionary, validating, writerVersion, new 
Configuration());
-  }
-
-  /**
-   * Create a new ParquetWriter.
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a 
RecordConsumer
-   * @param compressionCodecName the compression codec to use
-   * @param blockSize the block size threshold
-   * @param pageSize the page size threshold
-   * @param dictionaryPageSize the page size threshold for the dictionary pages
-   * @param enableDictionary to turn dictionary encoding on
-   * @param validating to turn on validation using the schema
-   * @param writerVersion version of parquetWriter from {@link 
parquet.column.ParquetProperties.WriterVersion}
-   * @param conf Hadoop configuration to use while accessing the filesystem
-   * @throws java.io.IOException
-   */
-  public ParquetWriter(
-      Path file,
-      WriteSupport<T> writeSupport,
-      CompressionCodecName compressionCodecName,
-      int blockSize,
-      int pageSize,
-      int dictionaryPageSize,
-      boolean enableDictionary,
-      boolean validating,
-      ParquetProperties.WriterVersion writerVersion,
-      Configuration conf) throws IOException {
-
-    WriteSupport.WriteContext writeContext = writeSupport.init(conf);
-    MessageType schema = writeContext.getSchema();
-
-    ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema, file);
-    fileWriter.start();
-
-    CodecFactory codecFactory = new CodecFactory(conf);
-    CodecFactory.BytesCompressor compressor =  
codecFactory.getCompressor(compressionCodecName, 0);
-    this.writer = new InternalParquetRecordWriter<>(
-            fileWriter,
-            writeSupport,
-            schema,
-            writeContext.getExtraMetaData(),
-            blockSize,
-            pageSize,
-            compressor,
-            dictionaryPageSize,
-            enableDictionary,
-            validating,
-            writerVersion);
-  }
-
-  /**
-   * Create a new ParquetWriter.  The default block size is 50 MB.The default
-   * page size is 1 MB.  Default compression is no compression. Dictionary 
encoding is disabled.
-   *
-   * @param file the file to create
-   * @param writeSupport the implementation to write a record to a 
RecordConsumer
-   * @throws java.io.IOException
-   */
-  public ParquetWriter(Path file, WriteSupport<T> writeSupport) throws 
IOException {
-    this(file, writeSupport, DEFAULT_COMPRESSION_CODEC_NAME, 
DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
-  }
-
-  public void write(T object) throws IOException {
-    try {
-      writer.write(object);
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }
-
-  public long getEstimatedWrittenSize() throws IOException {
-    return this.writer.getEstimatedWrittenSize();
-  }
-
-  @Override
-  public void close() throws IOException {
-    try {
-      writer.close();
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }}

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index a08dfb9..44d8fdc 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -98,6 +98,14 @@ public class TestStorages {
       "  ]\n" +
       "}\n";
 
+  private static String TEST_EMPTY_FILED_AVRO_SCHEMA =
+      "{\n" +
+          "  \"type\": \"record\",\n" +
+          "  \"namespace\": \"org.apache.tajo\",\n" +
+          "  \"name\": \"testEmptySchema\",\n" +
+          "  \"fields\": []\n" +
+          "}\n";
+
   private static String TEST_MAX_VALUE_AVRO_SCHEMA =
       "{\n" +
           "  \"type\": \"record\",\n" +
@@ -1123,7 +1131,6 @@ public class TestStorages {
   public void testLessThanSchemaSize() throws IOException {
     /* Internal storage must be same with schema size */
     if (internalType || dataFormat.equalsIgnoreCase(BuiltinStorages.AVRO)
-        || dataFormat.equalsIgnoreCase(BuiltinStorages.PARQUET)
         || dataFormat.equalsIgnoreCase(BuiltinStorages.ORC)) {
       return;
     }
@@ -1356,4 +1363,59 @@ public class TestStorages {
     scanner.close();
     assertEquals(1.0f, scanner.getProgress(), 0.0f);
   }
+
+  @Test
+  public void testEmptySchema() throws IOException {
+    if (internalType) return;
+
+    Schema schema = new Schema();
+    schema.addColumn("id", Type.INT4);
+    schema.addColumn("age", Type.INT8);
+    schema.addColumn("score", Type.FLOAT4);
+
+    TableMeta meta = CatalogUtil.newTableMeta(dataFormat);
+    meta.setPropertySet(CatalogUtil.newDefaultProperty(dataFormat));
+    if (dataFormat.equalsIgnoreCase(BuiltinStorages.AVRO)) {
+      meta.putProperty(StorageConstants.AVRO_SCHEMA_LITERAL,
+          TEST_PROJECTION_AVRO_SCHEMA);
+    }
+
+    Path tablePath = new Path(testDir, "testEmptySchema.data");
+    FileTablespace sm = TablespaceManager.getLocalFs();
+    Appender appender = sm.getAppender(meta, schema, tablePath);
+    appender.init();
+
+
+    Tuple expect = new VTuple(schema.size());
+    expect.put(new Datum[]{
+        DatumFactory.createInt4(Integer.MAX_VALUE),
+        DatumFactory.createInt8(Long.MAX_VALUE),
+        DatumFactory.createFloat4(Float.MAX_VALUE)
+    });
+
+    appender.addTuple(expect);
+    appender.flush();
+    appender.close();
+
+    assertTrue(fs.exists(tablePath));
+    FileStatus status = fs.getFileStatus(tablePath);
+
+    if (dataFormat.equalsIgnoreCase(BuiltinStorages.AVRO)) {
+      meta.putProperty(StorageConstants.AVRO_SCHEMA_LITERAL,
+          TEST_EMPTY_FILED_AVRO_SCHEMA);
+    }
+
+    //e,g select count(*) from table
+    Schema target = new Schema();
+    assertEquals(0, target.size());
+
+    FileFragment fragment = new FileFragment("table", tablePath, 0, 
status.getLen());
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, 
fragment, target);
+    scanner.init();
+
+    Tuple tuple = scanner.next();
+    assertNotNull(tuple);
+    assertEquals(0, tuple.size());
+    scanner.close();
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
index 35b9f07..ea223e7 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestReadWrite.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.Type;
+import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.storage.Tuple;
@@ -90,7 +91,7 @@ public class TestReadWrite {
     writer.write(tuple);
     writer.close();
 
-    TajoParquetReader reader = new TajoParquetReader(file, schema);
+    TajoParquetReader reader = new TajoParquetReader(new TajoConf(), file, 
schema, schema);
     tuple = reader.read();
 
     assertNotNull(tuple);

http://git-wip-us.apache.org/repos/asf/tajo/blob/ef43dfaa/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
----------------------------------------------------------------------
diff --git 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
index 07451d5..ba3f72e 100644
--- 
a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
+++ 
b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
@@ -18,12 +18,12 @@
 
 package org.apache.tajo.storage.parquet;
 
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
 import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.common.TajoDataTypes.Type;
 import org.junit.Test;
-import parquet.schema.MessageType;
-import parquet.schema.MessageTypeParser;
 
 import java.util.ArrayList;
 import java.util.List;

Reply via email to