Repository: parquet-mr
Updated Branches:
  refs/heads/master b301d1270 -> 30aa91012


PARQUET-601: Add support to configure the encoding used by ValueWriters

### Context:
Parquet is currently structured to choose the appropriate value writer based on 
the type of the column as well as the Parquet version. As of now, the writer(s) 
(and hence encoding) for each data type is hard coded in the Parquet source 
code.

This PR adds support for being able to override the encodings per type via 
config. That allows users to experiment with various encoding strategies 
manually as well as enables them to override the hardcoded defaults if they 
don't suit their use case.

We can override encodings per data type (int32 / int64 / ...).
Something on the lines of:
```
parquet.writer.encoding-override.<type> = "encoding1[,encoding2]"
```

As an example:
```
"parquet.writer.encoding-override.int32" = "plain"
(Chooses Plain encoding and hence the PlainValuesWriter).
```

When a primary + fallback need to be specified, we can do the following:
```
"parquet.writer.encoding-override.binary" = "rle_dictionary,delta_byte_array"
(Chooses RLE_DICTIONARY encoding as the initial encoding and DELTA_BYTE_ARRAY 
encoding as the fallback and hence creates a 
FallbackWriter(PlainBinaryDictionaryValuesWriter, DeltaByteArrayWriter).
```

In such cases we can mandate that the first encoding listed must allow for 
Fallbacks by implementing 
[RequiresFallback](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/column/values/RequiresFallback.java#L31).

### PR notes:

- Restructured the ValuesWriter creation code. Pulled it out of 
ParquetProperties into a new class and refactored the flow based on type as it 
was getting hard to follow and I felt adding the overrides would make it 
harder. Added a bunch of unit tests to verify the ValuesWriter we create for 
combinations of type, parquet version and dictionary on / off.
- Added unit tests to verify parsing of the encoding overrides + creation of 
ValuesWriters based on these overrides.
- Manually tested some encoding overrides scenarios out on Hadoop (both parquet 
v1, v2).

Author: Piyush Narang <[email protected]>

Closes #342 from piyushnarang/master and squashes the following commits:

3ebab28 [Piyush Narang] Remove Configurable
149bb98 [Piyush Narang] Switch to getValuesWriterFactory call to non-static
0b78e04 [Piyush Narang] Address Ryan's feedback
1da6ca3 [Piyush Narang] Merge branch 'master' into 
piyush/dynamic-encoding-overrides
f021ed2 [Piyush Narang] Tweak comment in ValuesWriterFactory
cb02ea0 [Piyush Narang] Fix review comments
bf4bc6d [Piyush Narang] Add support for Config setting in ValuesWriter factory
8a852a3 [Piyush Narang] Log values writer factory chosen
e4b61a4 [Piyush Narang] Tweak factory instantiation a bit
b46cccd [Piyush Narang] Add class based factory override
6a5428f [Piyush Narang] Clean up some stuff in ValuesWriterFactory
0f8cd09 [Piyush Narang] Refactor mockito version
9ead61d [Piyush Narang] Add guava test dep
5c636c7 [Piyush Narang] Add encoding-overrides config to ParquetOutputFormat 
config
b9d6c13 [Piyush Narang] Refactor code in ValuesWriterFactory a bit
ff4c90d [Piyush Narang] Pull out value writer creation to ValuesWriterFactory 
and add unit tests


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/30aa9101
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/30aa9101
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/30aa9101

Branch: refs/heads/master
Commit: 30aa91012cf6019bb9720609c1d03b5386a87ffb
Parents: b301d12
Author: Piyush Narang <[email protected]>
Authored: Thu Aug 11 13:30:43 2016 -0700
Committer: Alex Levenson <[email protected]>
Committed: Thu Aug 11 13:30:43 2016 -0700

----------------------------------------------------------------------
 parquet-avro/pom.xml                            |   2 +-
 parquet-cascading/pom.xml                       |   2 +-
 parquet-cascading3/pom.xml                      |   2 +-
 parquet-column/pom.xml                          |  12 +
 .../parquet/column/ParquetProperties.java       | 176 ++--------
 .../factory/DefaultV1ValuesWriterFactory.java   | 111 ++++++
 .../factory/DefaultV2ValuesWriterFactory.java   | 115 ++++++
 .../factory/DefaultValuesWriterFactory.java     |  87 +++++
 .../values/factory/ValuesWriterFactory.java     |  47 +++
 .../factory/DefaultValuesWriterFactoryTest.java | 350 +++++++++++++++++++
 parquet-hadoop/pom.xml                          |   4 +-
 .../parquet/hadoop/ParquetOutputFormat.java     |  23 +-
 parquet-pig/pom.xml                             |   2 +-
 parquet-protobuf/pom.xml                        |   2 +-
 parquet-tools/pom.xml                           |   2 +-
 pom.xml                                         |   2 +
 16 files changed, 782 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-avro/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml
index 50c37db..109cc38 100644
--- a/parquet-avro/pom.xml
+++ b/parquet-avro/pom.xml
@@ -67,7 +67,7 @@
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
-      <version>11.0</version>
+      <version>${guava.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-cascading/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-cascading/pom.xml b/parquet-cascading/pom.xml
index 0e1e1e1..0573aba 100644
--- a/parquet-cascading/pom.xml
+++ b/parquet-cascading/pom.xml
@@ -77,7 +77,7 @@
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
-      <version>1.9.5</version>
+      <version>${mockito.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-cascading3/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-cascading3/pom.xml b/parquet-cascading3/pom.xml
index 67b5e09..9aa8991 100644
--- a/parquet-cascading3/pom.xml
+++ b/parquet-cascading3/pom.xml
@@ -88,7 +88,7 @@
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
-      <version>1.9.5</version>
+      <version>${mockito.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-column/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml
index ccceafa..014921c 100644
--- a/parquet-column/pom.xml
+++ b/parquet-column/pom.xml
@@ -83,6 +83,18 @@
       <version>${slf4j.version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <version>${mockito.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava.version}</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
----------------------------------------------------------------------
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java 
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index e3881f8..e746811 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -24,30 +24,15 @@ import 
org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.bytes.HeapByteBufferAllocator;
 
 import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
-import static org.apache.parquet.column.Encoding.PLAIN;
-import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
-import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
 import org.apache.parquet.column.impl.ColumnWriteStoreV2;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter;
-import 
org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
-import 
org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
-import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
-import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter;
-import 
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter;
-import 
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter;
-import 
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter;
-import 
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainFloatDictionaryValuesWriter;
-import 
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter;
-import 
org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.PlainLongDictionaryValuesWriter;
-import org.apache.parquet.column.values.fallback.FallbackValuesWriter;
-import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter;
-import 
org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
-import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import org.apache.parquet.column.values.factory.DefaultValuesWriterFactory;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridEncoder;
 import 
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+import org.apache.parquet.column.values.factory.ValuesWriterFactory;
 import org.apache.parquet.schema.MessageType;
 
 /**
@@ -66,6 +51,8 @@ public class ParquetProperties {
   public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
   public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
 
+  public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new 
DefaultValuesWriterFactory();
+
   private static final int MIN_SLAB_SIZE = 64;
 
   public enum WriterVersion {
@@ -89,6 +76,7 @@ public class ParquetProperties {
     }
   }
 
+  private final int initialSlabSize;
   private final int pageSizeThreshold;
   private final int dictionaryPageSizeThreshold;
   private final WriterVersion writerVersion;
@@ -97,14 +85,14 @@ public class ParquetProperties {
   private final int maxRowCountForPageSizeCheck;
   private final boolean estimateNextSizeCheck;
   private final ByteBufferAllocator allocator;
-
-  private final int initialSlabSize;
+  private final ValuesWriterFactory valuesWriterFactory;
 
   private ParquetProperties(WriterVersion writerVersion, int pageSize, int 
dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
-                            int maxRowCountForPageSizeCheck, boolean 
estimateNextSizeCheck, ByteBufferAllocator allocator) {
+                            int maxRowCountForPageSizeCheck, boolean 
estimateNextSizeCheck, ByteBufferAllocator allocator,
+                            ValuesWriterFactory writerFactory) {
     this.pageSizeThreshold = pageSize;
     this.initialSlabSize = CapacityByteArrayOutputStream
-        .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
+      .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
     this.dictionaryPageSizeThreshold = dictPageSize;
     this.writerVersion = writerVersion;
     this.enableDictionary = enableDict;
@@ -112,6 +100,8 @@ public class ParquetProperties {
     this.maxRowCountForPageSizeCheck = maxRowCountForPageSizeCheck;
     this.estimateNextSizeCheck = estimateNextSizeCheck;
     this.allocator = allocator;
+
+    this.valuesWriterFactory = writerFactory;
   }
 
   public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
@@ -144,128 +134,18 @@ public class ParquetProperties {
         getWidthFromMaxInt(maxLevel), MIN_SLAB_SIZE, pageSizeThreshold, 
allocator);
   }
 
-  private ValuesWriter plainWriter(ColumnDescriptor path) {
-    switch (path.getType()) {
-    case BOOLEAN:
-      return new BooleanPlainValuesWriter();
-    case INT96:
-      return new FixedLenByteArrayPlainValuesWriter(12, initialSlabSize, 
pageSizeThreshold, allocator);
-    case FIXED_LEN_BYTE_ARRAY:
-      return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), 
initialSlabSize, pageSizeThreshold, allocator);
-    case BINARY:
-    case INT32:
-    case INT64:
-    case DOUBLE:
-    case FLOAT:
-      return new PlainValuesWriter(initialSlabSize, pageSizeThreshold, 
allocator);
-    default:
-      throw new IllegalArgumentException("Unknown type " + path.getType());
-    }
-  }
-
-  @SuppressWarnings("deprecation")
-  private DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path) {
-    Encoding encodingForDataPage;
-    Encoding encodingForDictionaryPage;
-    switch(writerVersion) {
-    case PARQUET_1_0:
-      encodingForDataPage = PLAIN_DICTIONARY;
-      encodingForDictionaryPage = PLAIN_DICTIONARY;
-      break;
-    case PARQUET_2_0:
-      encodingForDataPage = RLE_DICTIONARY;
-      encodingForDictionaryPage = PLAIN;
-      break;
-    default:
-      throw new IllegalArgumentException("Unknown version: " + writerVersion);
-    }
-    switch (path.getType()) {
-    case BOOLEAN:
-      throw new IllegalArgumentException("no dictionary encoding for BOOLEAN");
-    case BINARY:
-      return new 
PlainBinaryDictionaryValuesWriter(dictionaryPageSizeThreshold, 
encodingForDataPage, encodingForDictionaryPage, this.allocator);
-    case INT32:
-      return new 
PlainIntegerDictionaryValuesWriter(dictionaryPageSizeThreshold, 
encodingForDataPage, encodingForDictionaryPage, this.allocator);
-    case INT64:
-      return new PlainLongDictionaryValuesWriter(dictionaryPageSizeThreshold, 
encodingForDataPage, encodingForDictionaryPage, this.allocator);
-    case INT96:
-      return new 
PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, 12, 
encodingForDataPage, encodingForDictionaryPage, this.allocator);
-    case DOUBLE:
-      return new 
PlainDoubleDictionaryValuesWriter(dictionaryPageSizeThreshold, 
encodingForDataPage, encodingForDictionaryPage, this.allocator);
-    case FLOAT:
-      return new PlainFloatDictionaryValuesWriter(dictionaryPageSizeThreshold, 
encodingForDataPage, encodingForDictionaryPage, this.allocator);
-    case FIXED_LEN_BYTE_ARRAY:
-      return new 
PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, 
path.getTypeLength(), encodingForDataPage, encodingForDictionaryPage, 
this.allocator);
-    default:
-      throw new IllegalArgumentException("Unknown type " + path.getType());
-    }
-  }
-
-  private ValuesWriter writerToFallbackTo(ColumnDescriptor path) {
-    switch(writerVersion) {
-    case PARQUET_1_0:
-      return plainWriter(path);
-    case PARQUET_2_0:
-      switch (path.getType()) {
-      case BOOLEAN:
-        return new RunLengthBitPackingHybridValuesWriter(1, initialSlabSize, 
pageSizeThreshold, allocator);
-      case BINARY:
-      case FIXED_LEN_BYTE_ARRAY:
-        return new DeltaByteArrayWriter(initialSlabSize, pageSizeThreshold, 
allocator);
-      case INT32:
-        return new DeltaBinaryPackingValuesWriterForInteger(initialSlabSize, 
pageSizeThreshold, allocator);
-      case INT64:
-        return new DeltaBinaryPackingValuesWriterForLong(initialSlabSize, 
pageSizeThreshold, allocator);
-      case INT96:
-      case DOUBLE:
-      case FLOAT:
-        return plainWriter(path);
-      default:
-        throw new IllegalArgumentException("Unknown type " + path.getType());
-      }
-    default:
-      throw new IllegalArgumentException("Unknown version: " + writerVersion);
-    }
-  }
-
-  private ValuesWriter dictWriterWithFallBack(ColumnDescriptor path) {
-    ValuesWriter writerToFallBackTo = writerToFallbackTo(path);
-    if (enableDictionary) {
-      return FallbackValuesWriter.of(
-          dictionaryWriter(path),
-          writerToFallBackTo);
-    } else {
-     return writerToFallBackTo;
-    }
-  }
-
   public ValuesWriter newValuesWriter(ColumnDescriptor path) {
-    switch (path.getType()) {
-    case BOOLEAN: // no dictionary encoding for boolean
-      return writerToFallbackTo(path);
-    case FIXED_LEN_BYTE_ARRAY:
-      // dictionary encoding for that type was not enabled in PARQUET 1.0
-      if (writerVersion == WriterVersion.PARQUET_2_0) {
-        return dictWriterWithFallBack(path);
-      } else {
-       return writerToFallbackTo(path);
-      }
-    case BINARY:
-    case INT32:
-    case INT64:
-    case INT96:
-    case DOUBLE:
-    case FLOAT:
-      return dictWriterWithFallBack(path);
-    default:
-      throw new IllegalArgumentException("Unknown type " + path.getType());
-    }
+    return valuesWriterFactory.newValuesWriter(path);
   }
 
   public int getPageSizeThreshold() {
     return pageSizeThreshold;
   }
 
+  public int getInitialSlabSize() {
+    return initialSlabSize;
+  }
+
   public int getDictionaryPageSizeThreshold() {
     return dictionaryPageSizeThreshold;
   }
@@ -302,6 +182,10 @@ public class ParquetProperties {
     return maxRowCountForPageSizeCheck;
   }
 
+  public ValuesWriterFactory getValuesWriterFactory() {
+    return valuesWriterFactory;
+  }
+
   public boolean estimateNextSizeCheck() {
     return estimateNextSizeCheck;
   }
@@ -323,6 +207,7 @@ public class ParquetProperties {
     private int maxRowCountForPageSizeCheck = 
DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK;
     private boolean estimateNextSizeCheck = 
DEFAULT_ESTIMATE_ROW_COUNT_FOR_PAGE_SIZE_CHECK;
     private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
+    private ValuesWriterFactory valuesWriterFactory = 
DEFAULT_VALUES_WRITER_FACTORY;
 
     private Builder() {
     }
@@ -411,10 +296,25 @@ public class ParquetProperties {
       return this;
     }
 
+    public Builder withValuesWriterFactory(ValuesWriterFactory factory) {
+      Preconditions.checkNotNull(factory, "ValuesWriterFactory");
+      this.valuesWriterFactory = factory;
+      return this;
+    }
+
     public ParquetProperties build() {
-      return new ParquetProperties(writerVersion, pageSize, dictPageSize,
+      ParquetProperties properties =
+        new ParquetProperties(writerVersion, pageSize, dictPageSize,
           enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
-          estimateNextSizeCheck, allocator);
+          estimateNextSizeCheck, allocator, valuesWriterFactory);
+      // we pass a constructed but uninitialized factory to ParquetProperties 
above as currently
+      // creation of ValuesWriters is invoked from within ParquetProperties. 
In the future
+      // we'd like to decouple that and won't need to pass an object to 
properties and then pass the
+      // properties to the object.
+      valuesWriterFactory.initialize(properties);
+
+      return properties;
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
new file mode 100644
index 0000000..ffbd950
--- /dev/null
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV1ValuesWriterFactory.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.factory;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter;
+import 
org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+
+import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
+
+public class DefaultV1ValuesWriterFactory implements ValuesWriterFactory {
+
+  private ParquetProperties parquetProperties;
+
+  @Override
+  public void initialize(ParquetProperties properties) {
+    this.parquetProperties = properties;
+  }
+
+  private Encoding getEncodingForDataPage() {
+    return PLAIN_DICTIONARY;
+  }
+
+  private Encoding getEncodingForDictionaryPage() {
+    return PLAIN_DICTIONARY;
+  }
+
+  @Override
+  public ValuesWriter newValuesWriter(ColumnDescriptor descriptor) {
+    switch (descriptor.getType()) {
+      case BOOLEAN:
+        return getBooleanValuesWriter();
+      case FIXED_LEN_BYTE_ARRAY:
+        return getFixedLenByteArrayValuesWriter(descriptor);
+      case BINARY:
+        return getBinaryValuesWriter(descriptor);
+      case INT32:
+        return getInt32ValuesWriter(descriptor);
+      case INT64:
+        return getInt64ValuesWriter(descriptor);
+      case INT96:
+        return getInt96ValuesWriter(descriptor);
+      case DOUBLE:
+        return getDoubleValuesWriter(descriptor);
+      case FLOAT:
+        return getFloatValuesWriter(descriptor);
+      default:
+        throw new IllegalArgumentException("Unknown type " + 
descriptor.getType());
+    }
+  }
+
+  private ValuesWriter getBooleanValuesWriter() {
+    // no dictionary encoding for boolean
+    return new BooleanPlainValuesWriter();
+  }
+
+  private ValuesWriter getFixedLenByteArrayValuesWriter(ColumnDescriptor path) 
{
+    // dictionary encoding was not enabled in PARQUET 1.0
+    return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), 
parquetProperties.getInitialSlabSize(), 
parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+  }
+
+  private ValuesWriter getBinaryValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new 
PlainValuesWriter(parquetProperties.getInitialSlabSize(), 
parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, 
parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), 
fallbackWriter);
+  }
+
+  private ValuesWriter getInt32ValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new 
PlainValuesWriter(parquetProperties.getInitialSlabSize(), 
parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, 
parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), 
fallbackWriter);
+  }
+
+  private ValuesWriter getInt64ValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new 
PlainValuesWriter(parquetProperties.getInitialSlabSize(), 
parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, 
parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), 
fallbackWriter);
+  }
+
+  private ValuesWriter getInt96ValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new FixedLenByteArrayPlainValuesWriter(12, 
parquetProperties.getInitialSlabSize(), 
parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, 
parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), 
fallbackWriter);
+  }
+
+  private ValuesWriter getDoubleValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new 
PlainValuesWriter(parquetProperties.getInitialSlabSize(), 
parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, 
parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), 
fallbackWriter);
+  }
+
+  private ValuesWriter getFloatValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new 
PlainValuesWriter(parquetProperties.getInitialSlabSize(), 
parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, 
parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), 
fallbackWriter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
new file mode 100644
index 0000000..8348a02
--- /dev/null
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultV2ValuesWriterFactory.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.factory;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.values.ValuesWriter;
+import 
org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger;
+import 
org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
+import 
org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import 
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+
+import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
+
+public class DefaultV2ValuesWriterFactory implements ValuesWriterFactory {
+
+  private ParquetProperties parquetProperties;
+
+  @Override
+  public void initialize(ParquetProperties properties) {
+    this.parquetProperties = properties;
+  }
+
+  private Encoding getEncodingForDataPage() {
+    return RLE_DICTIONARY;
+  }
+
+  private Encoding getEncodingForDictionaryPage() {
+    return PLAIN;
+  }
+
+  @Override
+  public ValuesWriter newValuesWriter(ColumnDescriptor descriptor) {
+    switch (descriptor.getType()) {
+      case BOOLEAN:
+        return getBooleanValuesWriter();
+      case FIXED_LEN_BYTE_ARRAY:
+        return getFixedLenByteArrayValuesWriter(descriptor);
+      case BINARY:
+        return getBinaryValuesWriter(descriptor);
+      case INT32:
+        return getInt32ValuesWriter(descriptor);
+      case INT64:
+        return getInt64ValuesWriter(descriptor);
+      case INT96:
+        return getInt96ValuesWriter(descriptor);
+      case DOUBLE:
+        return getDoubleValuesWriter(descriptor);
+      case FLOAT:
+        return getFloatValuesWriter(descriptor);
+      default:
+        throw new IllegalArgumentException("Unknown type " + 
descriptor.getType());
+    }
+  }
+
+  private ValuesWriter getBooleanValuesWriter() {
+    // no dictionary encoding for boolean
+    return new RunLengthBitPackingHybridValuesWriter(1, 
parquetProperties.getInitialSlabSize(), 
parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+  }
+
+  private ValuesWriter getFixedLenByteArrayValuesWriter(ColumnDescriptor path) 
{
+    ValuesWriter fallbackWriter = new 
DeltaByteArrayWriter(parquetProperties.getInitialSlabSize(), 
parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, 
parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), 
fallbackWriter);
+  }
+
+  private ValuesWriter getBinaryValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new 
DeltaByteArrayWriter(parquetProperties.getInitialSlabSize(), 
parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, 
parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), 
fallbackWriter);
+  }
+
+  private ValuesWriter getInt32ValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new 
DeltaBinaryPackingValuesWriterForInteger(parquetProperties.getInitialSlabSize(),
 parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, 
parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), 
fallbackWriter);
+  }
+
+  private ValuesWriter getInt64ValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new 
DeltaBinaryPackingValuesWriterForLong(parquetProperties.getInitialSlabSize(), 
parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, 
parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), 
fallbackWriter);
+  }
+
+  private ValuesWriter getInt96ValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new FixedLenByteArrayPlainValuesWriter(12, 
parquetProperties.getInitialSlabSize(), 
parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, 
parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), 
fallbackWriter);
+  }
+
+  private ValuesWriter getDoubleValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new 
PlainValuesWriter(parquetProperties.getInitialSlabSize(), 
parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, 
parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), 
fallbackWriter);
+  }
+
+  private ValuesWriter getFloatValuesWriter(ColumnDescriptor path) {
+    ValuesWriter fallbackWriter = new 
PlainValuesWriter(parquetProperties.getInitialSlabSize(), 
parquetProperties.getPageSizeThreshold(), parquetProperties.getAllocator());
+    return DefaultValuesWriterFactory.dictWriterWithFallBack(path, 
parquetProperties, getEncodingForDictionaryPage(), getEncodingForDataPage(), 
fallbackWriter);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java
new file mode 100644
index 0000000..6584894
--- /dev/null
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactory.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.factory;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter;
+import org.apache.parquet.column.values.fallback.FallbackValuesWriter;
+
+/**
+ * Handles ValuesWriter creation statically based on the types of the columns 
and the writer version.
+ */
+public class DefaultValuesWriterFactory implements ValuesWriterFactory {
+
+  private ValuesWriterFactory delegateFactory;
+
+  private static final ValuesWriterFactory DEFAULT_V1_WRITER_FACTORY = new 
DefaultV1ValuesWriterFactory();
+  private static final ValuesWriterFactory DEFAULT_V2_WRITER_FACTORY = new 
DefaultV2ValuesWriterFactory();
+
+  @Override
+  public void initialize(ParquetProperties properties) {
+    if (properties.getWriterVersion() == WriterVersion.PARQUET_1_0) {
+      delegateFactory = DEFAULT_V1_WRITER_FACTORY;
+    } else {
+      delegateFactory = DEFAULT_V2_WRITER_FACTORY;
+    }
+
+    delegateFactory.initialize(properties);
+  }
+
+  @Override
+  public ValuesWriter newValuesWriter(ColumnDescriptor descriptor) {
+    return delegateFactory.newValuesWriter(descriptor);
+  }
+
+  static DictionaryValuesWriter dictionaryWriter(ColumnDescriptor path, 
ParquetProperties properties, Encoding dictPageEncoding, Encoding 
dataPageEncoding) {
+    switch (path.getType()) {
+      case BOOLEAN:
+        throw new IllegalArgumentException("no dictionary encoding for 
BOOLEAN");
+      case BINARY:
+        return new 
DictionaryValuesWriter.PlainBinaryDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(),
 dataPageEncoding, dictPageEncoding, properties.getAllocator());
+      case INT32:
+        return new 
DictionaryValuesWriter.PlainIntegerDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(),
 dataPageEncoding, dictPageEncoding, properties.getAllocator());
+      case INT64:
+        return new 
DictionaryValuesWriter.PlainLongDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(),
 dataPageEncoding, dictPageEncoding, properties.getAllocator());
+      case INT96:
+        return new 
DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(),
 12, dataPageEncoding, dictPageEncoding, properties.getAllocator());
+      case DOUBLE:
+        return new 
DictionaryValuesWriter.PlainDoubleDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(),
 dataPageEncoding, dictPageEncoding, properties.getAllocator());
+      case FLOAT:
+        return new 
DictionaryValuesWriter.PlainFloatDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(),
 dataPageEncoding, dictPageEncoding, properties.getAllocator());
+      case FIXED_LEN_BYTE_ARRAY:
+        return new 
DictionaryValuesWriter.PlainFixedLenArrayDictionaryValuesWriter(properties.getDictionaryPageSizeThreshold(),
 path.getTypeLength(), dataPageEncoding, dictPageEncoding, 
properties.getAllocator());
+      default:
+        throw new IllegalArgumentException("Unknown type " + path.getType());
+    }
+  }
+
+  static ValuesWriter dictWriterWithFallBack(ColumnDescriptor path, 
ParquetProperties parquetProperties, Encoding dictPageEncoding, Encoding 
dataPageEncoding, ValuesWriter writerToFallBackTo) {
+    if (parquetProperties.isEnableDictionary()) {
+      return FallbackValuesWriter.of(
+        dictionaryWriter(path, parquetProperties, dictPageEncoding, 
dataPageEncoding),
+        writerToFallBackTo);
+    } else {
+      return writerToFallBackTo;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-column/src/main/java/org/apache/parquet/column/values/factory/ValuesWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/values/factory/ValuesWriterFactory.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/ValuesWriterFactory.java
new file mode 100644
index 0000000..8f06e7b
--- /dev/null
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/values/factory/ValuesWriterFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.factory;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.values.ValuesWriter;
+
+/**
+ * Can be overridden to allow users to manually test different strategies to 
create ValuesWriters.
+ * To do this, the ValuesWriterFactory to be used must be passed to the {@link 
org.apache.parquet.column.ParquetProperties.Builder}.
+ * <ul>Lifecycle of ValuesWriterFactories is:
+ * <li> Initialized while creating a {@link 
org.apache.parquet.column.ParquetProperties} using the Builder</li>
+ * <li> If the factory must read Hadoop config, it needs to implement the 
Configurable interface.
+ * In addition to that, ParquetOutputFormat needs to be updated to pass in the 
Hadoop config via the setConf()
+ * method on the Configurable interface.</li>
+ * <li> newValuesWriter is called once per column for every block of data.</li>
+ * </ul>
+ */
+public interface ValuesWriterFactory {
+
+  /**
+   * Used to initialize the factory. This method is called before 
newValuesWriter()
+   */
+  void initialize(ParquetProperties parquetProperties);
+
+  /**
+   * Creates a ValuesWriter to write values for the given column.
+   */
+  ValuesWriter newValuesWriter(ColumnDescriptor descriptor);
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
 
b/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
new file mode 100644
index 0000000..d6865e2
--- /dev/null
+++ 
b/parquet-column/src/test/java/org/apache/parquet/column/values/factory/DefaultValuesWriterFactoryTest.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.values.factory;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.column.ParquetProperties.WriterVersion;
+import org.apache.parquet.column.values.ValuesWriter;
+import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
+import 
org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong;
+import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter;
+import org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.*;
+import org.apache.parquet.column.values.fallback.FallbackValuesWriter;
+import org.apache.parquet.column.values.plain.BooleanPlainValuesWriter;
+import 
org.apache.parquet.column.values.plain.FixedLenByteArrayPlainValuesWriter;
+import org.apache.parquet.column.values.plain.PlainValuesWriter;
+import 
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
+
+import org.junit.Test;
+
+import static junit.framework.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class DefaultValuesWriterFactoryTest {
+
+  @Test
+  public void testBoolean() {
+    doTestValueWriter(
+      PrimitiveTypeName.BOOLEAN,
+      WriterVersion.PARQUET_1_0,
+      true,
+      BooleanPlainValuesWriter.class);
+  }
+
+  @Test
+  public void testBoolean_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.BOOLEAN,
+      WriterVersion.PARQUET_2_0,
+      true,
+      RunLengthBitPackingHybridValuesWriter.class);
+  }
+
+  @Test
+  public void testFixedLenByteArray() {
+    doTestValueWriter(
+      PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
+      WriterVersion.PARQUET_1_0,
+      true,
+      FixedLenByteArrayPlainValuesWriter.class);
+  }
+
+  @Test
+  public void testFixedLenByteArray_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
+      WriterVersion.PARQUET_2_0,
+      true,
+      DictionaryValuesWriter.class, DeltaByteArrayWriter.class);
+  }
+
+  @Test
+  public void testFixedLenByteArray_V2_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY,
+      WriterVersion.PARQUET_2_0,
+      false,
+      DeltaByteArrayWriter.class);
+  }
+
+  @Test
+  public void testBinary() {
+    doTestValueWriter(
+      PrimitiveTypeName.BINARY,
+      WriterVersion.PARQUET_1_0,
+      true,
+      PlainBinaryDictionaryValuesWriter.class, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testBinary_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.BINARY,
+      WriterVersion.PARQUET_1_0,
+      false,
+      PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testBinary_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.BINARY,
+      WriterVersion.PARQUET_2_0,
+      true,
+      PlainBinaryDictionaryValuesWriter.class, DeltaByteArrayWriter.class);
+  }
+
+  @Test
+  public void testBinary_V2_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.BINARY,
+      WriterVersion.PARQUET_2_0,
+      false,
+      DeltaByteArrayWriter.class);
+  }
+
+  @Test
+  public void testInt32() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT32,
+      WriterVersion.PARQUET_1_0,
+      true,
+      PlainIntegerDictionaryValuesWriter.class, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt32_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT32,
+      WriterVersion.PARQUET_1_0,
+      false,
+      PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt32_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT32,
+      WriterVersion.PARQUET_2_0,
+      true,
+      PlainIntegerDictionaryValuesWriter.class, 
DeltaBinaryPackingValuesWriter.class);
+  }
+
+  @Test
+  public void testInt32_V2_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT32,
+      WriterVersion.PARQUET_2_0,
+      false,
+      DeltaBinaryPackingValuesWriter.class);
+  }
+
+  @Test
+  public void testInt64() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT64,
+      WriterVersion.PARQUET_1_0,
+      true,
+      PlainLongDictionaryValuesWriter.class, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt64_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT64,
+      WriterVersion.PARQUET_1_0,
+      false,
+      PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt64_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT64,
+      WriterVersion.PARQUET_2_0,
+      true,
+      PlainLongDictionaryValuesWriter.class, 
DeltaBinaryPackingValuesWriterForLong.class);
+  }
+
+  @Test
+  public void testInt64_V2_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT64,
+      WriterVersion.PARQUET_2_0,
+      false,
+      DeltaBinaryPackingValuesWriterForLong.class);
+  }
+
+  @Test
+  public void testInt96() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT96,
+      WriterVersion.PARQUET_1_0,
+      true,
+      PlainFixedLenArrayDictionaryValuesWriter.class, 
FixedLenByteArrayPlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt96_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT96,
+      WriterVersion.PARQUET_1_0,
+      false,
+      FixedLenByteArrayPlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt96_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT96,
+      WriterVersion.PARQUET_2_0,
+      true,
+      PlainFixedLenArrayDictionaryValuesWriter.class, 
FixedLenByteArrayPlainValuesWriter.class);
+  }
+
+  @Test
+  public void testInt96_V2_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.INT96,
+      WriterVersion.PARQUET_2_0,
+      false,
+      FixedLenByteArrayPlainValuesWriter.class);
+  }
+
+  @Test
+  public void testDouble() {
+    doTestValueWriter(
+      PrimitiveTypeName.DOUBLE,
+      WriterVersion.PARQUET_1_0,
+      true,
+      PlainDoubleDictionaryValuesWriter.class, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testDouble_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.DOUBLE,
+      WriterVersion.PARQUET_1_0,
+      false,
+      PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testDouble_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.DOUBLE,
+      WriterVersion.PARQUET_2_0,
+      true,
+      PlainDoubleDictionaryValuesWriter.class, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testDouble_V2_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.DOUBLE,
+      WriterVersion.PARQUET_2_0,
+      false,
+      PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testFloat() {
+    doTestValueWriter(
+      PrimitiveTypeName.FLOAT,
+      WriterVersion.PARQUET_1_0,
+      true,
+      PlainFloatDictionaryValuesWriter.class, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testFloat_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.FLOAT,
+      WriterVersion.PARQUET_1_0,
+      false,
+      PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testFloat_V2() {
+    doTestValueWriter(
+      PrimitiveTypeName.FLOAT,
+      WriterVersion.PARQUET_2_0,
+      true,
+      PlainFloatDictionaryValuesWriter.class, PlainValuesWriter.class);
+  }
+
+  @Test
+  public void testFloat_V2_NoDict() {
+    doTestValueWriter(
+      PrimitiveTypeName.FLOAT,
+      WriterVersion.PARQUET_2_0,
+      false,
+      PlainValuesWriter.class);
+  }
+
+  private void doTestValueWriter(PrimitiveTypeName typeName, WriterVersion 
version, boolean enableDictionary, Class<? extends ValuesWriter> 
expectedValueWriterClass) {
+    ColumnDescriptor mockPath = getMockColumn(typeName);
+    ValuesWriterFactory factory = getDefaultFactory(version, enableDictionary);
+    ValuesWriter writer = factory.newValuesWriter(mockPath);
+
+    validateWriterType(writer, expectedValueWriterClass);
+  }
+
+  private void doTestValueWriter(PrimitiveTypeName typeName, WriterVersion 
version, boolean enableDictionary, Class<? extends ValuesWriter> 
initialValueWriterClass, Class<? extends ValuesWriter> 
fallbackValueWriterClass) {
+    ColumnDescriptor mockPath = getMockColumn(typeName);
+    ValuesWriterFactory factory = getDefaultFactory(version, enableDictionary);
+    ValuesWriter writer = factory.newValuesWriter(mockPath);
+
+    validateFallbackWriter(writer, initialValueWriterClass, 
fallbackValueWriterClass);
+  }
+
+  private ColumnDescriptor getMockColumn(PrimitiveTypeName typeName) {
+    ColumnDescriptor mockPath = mock(ColumnDescriptor.class);
+    when(mockPath.getType()).thenReturn(typeName);
+    return mockPath;
+  }
+
+  private ValuesWriterFactory getDefaultFactory(WriterVersion writerVersion, 
boolean enableDictionary) {
+    ValuesWriterFactory factory = new DefaultValuesWriterFactory();
+    ParquetProperties.builder()
+      .withDictionaryEncoding(enableDictionary)
+      .withWriterVersion(writerVersion)
+      .withValuesWriterFactory(factory)
+      .build();
+
+    return factory;
+  }
+
+  private void validateWriterType(ValuesWriter writer, Class<? extends 
ValuesWriter> valuesWriterClass) {
+    assertTrue("Not instance of: " + valuesWriterClass.getName(), 
valuesWriterClass.isInstance(writer));
+  }
+
+  private void validateFallbackWriter(ValuesWriter writer, Class<? extends 
ValuesWriter> initialWriterClass, Class<? extends ValuesWriter> 
fallbackWriterClass) {
+    validateWriterType(writer, FallbackValuesWriter.class);
+
+    FallbackValuesWriter wr = (FallbackValuesWriter) writer;
+    validateWriterType(wr.initialWriter, initialWriterClass);
+    validateWriterType(wr.fallBackWriter, fallbackWriterClass);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index 4c43182..79ca7f4 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -83,13 +83,13 @@
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
-      <version>11.0</version>
+      <version>${guava.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-all</artifactId>
-      <version>1.9.5</version>
+      <version>${mockito.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index 6cfa8e9..d05d41f 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -317,7 +317,6 @@ public class ParquetOutputFormat<T> extends 
FileOutputFormat<Void, T> {
     return conf.getInt(MAX_PADDING_BYTES, DEFAULT_MAX_PADDING_SIZE);
   }
 
-
   private WriteSupport<T> writeSupport;
   private ParquetOutputCommitter committer;
 
@@ -374,16 +373,18 @@ public class ParquetOutputFormat<T> extends 
FileOutputFormat<Void, T> {
     int maxPaddingSize = getMaxPaddingSize(conf);
     boolean validating = getValidation(conf);
 
-    if (INFO) LOG.info("Parquet block size to " + blockSize);
-    if (INFO) LOG.info("Parquet page size to " + props.getPageSizeThreshold());
-    if (INFO) LOG.info("Parquet dictionary page size to " + 
props.getDictionaryPageSizeThreshold());
-    if (INFO) LOG.info("Dictionary is " + (props.isEnableDictionary() ? "on" : 
"off"));
-    if (INFO) LOG.info("Validation is " + (validating ? "on" : "off"));
-    if (INFO) LOG.info("Writer version is: " + props.getWriterVersion());
-    if (INFO) LOG.info("Maximum row group padding size is " + maxPaddingSize + 
" bytes");
-    if (INFO) LOG.info("Page size checking is: " + 
(props.estimateNextSizeCheck() ? "estimated" : "constant"));
-    if (INFO) LOG.info("Min row count for page size check is: " + 
props.getMinRowCountForPageSizeCheck());
-    if (INFO) LOG.info("Max row count for page size check is: " + 
props.getMaxRowCountForPageSizeCheck());
+    if (INFO) {
+      LOG.info("Parquet block size to " + blockSize);
+      LOG.info("Parquet page size to " + props.getPageSizeThreshold());
+      LOG.info("Parquet dictionary page size to " + 
props.getDictionaryPageSizeThreshold());
+      LOG.info("Dictionary is " + (props.isEnableDictionary() ? "on" : "off"));
+      LOG.info("Validation is " + (validating ? "on" : "off"));
+      LOG.info("Writer version is: " + props.getWriterVersion());
+      LOG.info("Maximum row group padding size is " + maxPaddingSize + " 
bytes");
+      LOG.info("Page size checking is: " + (props.estimateNextSizeCheck() ? 
"estimated" : "constant"));
+      LOG.info("Min row count for page size check is: " + 
props.getMinRowCountForPageSizeCheck());
+      LOG.info("Max row count for page size check is: " + 
props.getMaxRowCountForPageSizeCheck());
+    }
 
     WriteContext init = writeSupport.init(conf);
     ParquetFileWriter w = new ParquetFileWriter(

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-pig/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-pig/pom.xml b/parquet-pig/pom.xml
index 9b6371e..4142c3b 100644
--- a/parquet-pig/pom.xml
+++ b/parquet-pig/pom.xml
@@ -101,7 +101,7 @@
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
-      <version>11.0</version>
+      <version>${guava.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-protobuf/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-protobuf/pom.xml b/parquet-protobuf/pom.xml
index b3e4e50..0c9cae4 100644
--- a/parquet-protobuf/pom.xml
+++ b/parquet-protobuf/pom.xml
@@ -42,7 +42,7 @@
     <dependency>
       <groupId>org.mockito</groupId>
       <artifactId>mockito-core</artifactId>
-      <version>1.9.5</version>
+      <version>${mockito.version}</version>
       <scope>test</scope>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/parquet-tools/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-tools/pom.xml b/parquet-tools/pom.xml
index 5d3f7c9..66abaa9 100644
--- a/parquet-tools/pom.xml
+++ b/parquet-tools/pom.xml
@@ -70,7 +70,7 @@
     <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
-      <version>11.0</version>
+      <version>${guava.version}</version>
     </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/30aa9101/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 510c329..ccc4675 100644
--- a/pom.xml
+++ b/pom.xml
@@ -86,6 +86,8 @@
     <semver.api.version>0.9.33</semver.api.version>
     <slf4j.version>1.7.5</slf4j.version>
     <avro.version>1.8.0</avro.version>
+    <guava.version>11.0</guava.version>
+    <mockito.version>1.9.5</mockito.version>
   </properties>
 
   <modules>

Reply via email to