Github user blrunner commented on a diff in the pull request:
https://github.com/apache/tajo/pull/75#discussion_r14917894
--- Diff:
tajo-storage/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetWriter.java
---
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tajo.storage.thirdparty.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import parquet.column.ParquetProperties;
+import parquet.hadoop.api.WriteSupport;
+import parquet.hadoop.metadata.CompressionCodecName;
+import parquet.schema.MessageType;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public class ParquetWriter<T> implements Closeable {
+
+ public static final int DEFAULT_BLOCK_SIZE = 128 * 1024 * 1024;
+ public static final int DEFAULT_PAGE_SIZE = 1 * 1024 * 1024;
+ public static final CompressionCodecName DEFAULT_COMPRESSION_CODEC_NAME =
+ CompressionCodecName.UNCOMPRESSED;
+ public static final boolean DEFAULT_IS_DICTIONARY_ENABLED = true;
+ public static final boolean DEFAULT_IS_VALIDATING_ENABLED = false;
+ public static final ParquetProperties.WriterVersion
DEFAULT_WRITER_VERSION =
+ ParquetProperties.WriterVersion.PARQUET_1_0;
+
+ private final InternalParquetRecordWriter<T> writer;
+
+ /**
+ * Create a new ParquetWriter.
+ * (with dictionary encoding enabled and validation off)
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a
RecordConsumer
+ * @param compressionCodecName the compression codec to use
+ * @param blockSize the block size threshold
+ * @param pageSize the page size threshold
+ * @throws java.io.IOException
+ * @see #ParquetWriter(org.apache.hadoop.fs.Path,
parquet.hadoop.api.WriteSupport, CompressionCodecName, int, int, boolean,
boolean)
+ */
+ public ParquetWriter(Path file, WriteSupport<T> writeSupport,
CompressionCodecName compressionCodecName, int blockSize, int pageSize) throws
IOException {
+ this(file, writeSupport, compressionCodecName, blockSize, pageSize,
+ DEFAULT_IS_DICTIONARY_ENABLED, DEFAULT_IS_VALIDATING_ENABLED);
+ }
+
+ /**
+ * Create a new ParquetWriter.
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a
RecordConsumer
+ * @param compressionCodecName the compression codec to use
+ * @param blockSize the block size threshold
+ * @param pageSize the page size threshold (both data and dictionary)
+ * @param enableDictionary to turn dictionary encoding on
+ * @param validating to turn on validation using the schema
+ * @throws IOException
+ * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int,
int, int, boolean, boolean)
+ */
+ public ParquetWriter(
+ Path file,
+ WriteSupport<T> writeSupport,
+ CompressionCodecName compressionCodecName,
+ int blockSize,
+ int pageSize,
+ boolean enableDictionary,
+ boolean validating) throws IOException {
+ this(file, writeSupport, compressionCodecName, blockSize, pageSize,
pageSize, enableDictionary, validating);
+ }
+
+ /**
+ * Create a new ParquetWriter.
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a
RecordConsumer
+ * @param compressionCodecName the compression codec to use
+ * @param blockSize the block size threshold
+ * @param pageSize the page size threshold
+ * @param dictionaryPageSize the page size threshold for the dictionary
pages
+ * @param enableDictionary to turn dictionary encoding on
+ * @param validating to turn on validation using the schema
+ * @throws IOException
+ * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int,
int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion)
+ */
+ public ParquetWriter(
+ Path file,
+ WriteSupport<T> writeSupport,
+ CompressionCodecName compressionCodecName,
+ int blockSize,
+ int pageSize,
+ int dictionaryPageSize,
+ boolean enableDictionary,
+ boolean validating) throws IOException {
+ this(file, writeSupport, compressionCodecName, blockSize, pageSize,
+ dictionaryPageSize, enableDictionary, validating,
+ DEFAULT_WRITER_VERSION);
+ }
+
+ /**
+ * Create a new ParquetWriter.
+ *
+ * Directly instantiates a Hadoop {@link
org.apache.hadoop.conf.Configuration} which reads
+ * configuration from the classpath.
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a
RecordConsumer
+ * @param compressionCodecName the compression codec to use
+ * @param blockSize the block size threshold
+ * @param pageSize the page size threshold
+ * @param dictionaryPageSize the page size threshold for the dictionary
pages
+ * @param enableDictionary to turn dictionary encoding on
+ * @param validating to turn on validation using the schema
+ * @param writerVersion version of parquetWriter from {@link
ParquetProperties.WriterVersion}
+ * @throws IOException
+ * @see #ParquetWriter(Path, WriteSupport, CompressionCodecName, int,
int, int, boolean, boolean, parquet.column.ParquetProperties.WriterVersion,
org.apache.hadoop.conf.Configuration)
+ */
+ public ParquetWriter(
+ Path file,
+ WriteSupport<T> writeSupport,
+ CompressionCodecName compressionCodecName,
+ int blockSize,
+ int pageSize,
+ int dictionaryPageSize,
+ boolean enableDictionary,
+ boolean validating,
+ ParquetProperties.WriterVersion writerVersion) throws IOException {
+ this(file, writeSupport, compressionCodecName, blockSize, pageSize,
dictionaryPageSize, enableDictionary, validating, writerVersion, new
Configuration());
+ }
+
+ /**
+ * Create a new ParquetWriter.
+ *
+ * @param file the file to create
+ * @param writeSupport the implementation to write a record to a
RecordConsumer
+ * @param compressionCodecName the compression codec to use
+ * @param blockSize the block size threshold
+ * @param pageSize the page size threshold
+ * @param dictionaryPageSize the page size threshold for the dictionary
pages
+ * @param enableDictionary to turn dictionary encoding on
+ * @param validating to turn on validation using the schema
+ * @param writerVersion version of parquetWriter from {@link
ParquetProperties.WriterVersion}
+ * @param conf Hadoop configuration to use while accessing the filesystem
+ * @throws IOException
+ */
+ public ParquetWriter(
+ Path file,
+ WriteSupport<T> writeSupport,
+ CompressionCodecName compressionCodecName,
+ int blockSize,
+ int pageSize,
+ int dictionaryPageSize,
+ boolean enableDictionary,
+ boolean validating,
+ ParquetProperties.WriterVersion writerVersion,
+ Configuration conf) throws IOException {
+
+ WriteSupport.WriteContext writeContext = writeSupport.init(conf);
+ MessageType schema = writeContext.getSchema();
+
+ ParquetFileWriter fileWriter = new ParquetFileWriter(conf, schema,
file);
+ fileWriter.start();
+
+ CodecFactory codecFactory = new CodecFactory(conf);
+ CodecFactory.BytesCompressor compressor =
codecFactory.getCompressor(compressionCodecName, 0);
+ this.writer = new InternalParquetRecordWriter<T>(
+ fileWriter,
+ writeSupport,
+ schema,
+ writeContext.getExtraMetaData(),
+ blockSize,
+ pageSize,
+ compressor,
+ dictionaryPageSize,
+ enableDictionary,
+ validating,
+ writerVersion);
+ }
+
+ /**
+ * Create a new ParquetWriter. The default block size is 50 MB.The
default
--- End diff --
Is it a typo? You already set DEFAULT_BLOCK_SIZE to 128MB.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---