[ 
https://issues.apache.org/jira/browse/PARQUET-1253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16488839#comment-16488839
 ] 

ASF GitHub Bot commented on PARQUET-1253:
-----------------------------------------

gszadovszky closed pull request #463: PARQUET-1253: Support for new logical 
type representation
URL: https://github.com/apache/parquet-mr/pull/463
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
 
b/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
index 7b9f817e3..97b2ccf99 100644
--- 
a/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
+++ 
b/parquet-cascading3/src/test/java/org/apache/parquet/cascading/TestParquetTBaseScheme.java
@@ -40,14 +40,12 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.thrift.protocol.TCompactProtocol;
 import org.apache.thrift.protocol.TProtocol;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TIOStreamTransport;
 import org.junit.Test;
-import static org.junit.Assert.*;
 
 import org.apache.parquet.hadoop.thrift.ThriftToParquetFileWriter;
 import org.apache.parquet.hadoop.util.ContextUtil;
@@ -55,8 +53,9 @@
 
 import java.io.File;
 import java.io.ByteArrayOutputStream;
-import java.util.HashMap;
-import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class TestParquetTBaseScheme {
   final String txtInputPath = "target/test-classes/names.txt";
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java 
b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
index 68dba979b..3ff25b6db 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/GroupType.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -47,7 +47,7 @@
    * @param fields the contained fields
    */
   public GroupType(Repetition repetition, String name, List<Type> fields) {
-    this(repetition, name, null, fields, null);
+    this(repetition, name, (LogicalTypeAnnotation) null, fields, null);
   }
 
   /**
@@ -97,6 +97,15 @@ public GroupType(Repetition repetition, String name, 
OriginalType originalType,
     }
   }
 
+  GroupType(Repetition repetition, String name, LogicalTypeAnnotation 
logicalTypeAnnotation, List<Type> fields, ID id) {
+    super(name, repetition, logicalTypeAnnotation, id);
+    this.fields = fields;
+    this.indexByName = new HashMap<String, Integer>();
+    for (int i = 0; i < fields.size(); i++) {
+      indexByName.put(fields.get(i).getName(), i);
+    }
+  }
+
   /**
    * @param id the field id
    * @return a new GroupType with the same fields and a new id
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java
 
b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java
new file mode 100644
index 000000000..e22867aec
--- /dev/null
+++ 
b/parquet-column/src/main/java/org/apache/parquet/schema/LogicalTypeAnnotation.java
@@ -0,0 +1,878 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.schema;
+
+import org.apache.parquet.Preconditions;
+
+import java.util.List;
+import java.util.Objects;
+
+public abstract class LogicalTypeAnnotation {
+  enum LogicalTypeToken {
+    MAP {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return mapType();
+      }
+    },
+    LIST {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return listType();
+      }
+    },
+    UTF8 {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return stringType();
+      }
+    },
+    MAP_KEY_VALUE {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return MapKeyValueTypeAnnotation.getInstance();
+      }
+    },
+    ENUM {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return enumType();
+      }
+    },
+    DECIMAL {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        if (params.size() != 2) {
+          throw new RuntimeException("Expecting 2 parameters for decimal 
logical type, got " + params.size());
+        }
+        return decimalType(Integer.valueOf(params.get(1)), 
Integer.valueOf(params.get(0)));
+      }
+    },
+    DATE {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return dateType();
+      }
+    },
+    TIME {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        if (params.size() != 2) {
+          throw new RuntimeException("Expecting 2 parameters for time logical 
type, got " + params.size());
+        }
+        return timeType(Boolean.parseBoolean(params.get(1)), 
TimeUnit.valueOf(params.get(0)));
+      }
+    },
+    TIMESTAMP {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        if (params.size() != 2) {
+          throw new RuntimeException("Expecting 2 parameters for timestamp 
logical type, got " + params.size());
+        }
+        return timestampType(Boolean.parseBoolean(params.get(1)), 
TimeUnit.valueOf(params.get(0)));
+      }
+    },
+    INT {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        if (params.size() != 2) {
+          throw new RuntimeException("Expecting 2 parameters for integer 
logical type, got " + params.size());
+        }
+        return intType(Integer.valueOf(params.get(0)), 
Boolean.parseBoolean(params.get(1)));
+      }
+    },
+    JSON {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return jsonType();
+      }
+    },
+    BSON {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return bsonType();
+      }
+    },
+    INTERVAL {
+      @Override
+      protected LogicalTypeAnnotation fromString(List<String> params) {
+        return IntervalLogicalTypeAnnotation.getInstance();
+      }
+    };
+
+    protected abstract LogicalTypeAnnotation fromString(List<String> params);
+  }
+
+  /**
+   * Convert this logical type to old logical type representation in 
parquet-mr (if there's any).
+   * Those logical type implementations, which don't have a corresponding 
mapping should return null.
+   *
+   * @return the OriginalType representation of the new logical type, or null 
if there's none
+   */
+  public abstract OriginalType toOriginalType();
+
+  /**
+   * Visits this logical type with the given visitor
+   *
+   * @param logicalTypeAnnotationVisitor the visitor to visit this type
+   */
+  public abstract void accept(LogicalTypeAnnotationVisitor 
logicalTypeAnnotationVisitor);
+
+  abstract LogicalTypeToken getType();
+
+  String typeParametersAsString() {
+    return "";
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(getType());
+    sb.append(typeParametersAsString());
+    return sb.toString();
+  }
+
+  /**
+   * Helper method to convert the old representation of logical types 
(OriginalType) to new logical type.
+   */
+  public static LogicalTypeAnnotation fromOriginalType(OriginalType 
originalType, DecimalMetadata decimalMetadata) {
+    if (originalType == null) {
+      return null;
+    }
+    switch (originalType) {
+      case UTF8:
+        return stringType();
+      case MAP:
+        return mapType();
+      case DECIMAL:
+        int scale = (decimalMetadata == null ? 0 : decimalMetadata.getScale());
+        int precision = (decimalMetadata == null ? 0 : 
decimalMetadata.getPrecision());
+        return decimalType(scale, precision);
+      case LIST:
+        return listType();
+      case DATE:
+        return dateType();
+      case INTERVAL:
+        return IntervalLogicalTypeAnnotation.getInstance();
+      case TIMESTAMP_MILLIS:
+        return timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
+      case TIMESTAMP_MICROS:
+        return timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
+      case TIME_MILLIS:
+        return timeType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
+      case TIME_MICROS:
+        return timeType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
+      case UINT_8:
+        return intType(8, false);
+      case UINT_16:
+        return intType(16, false);
+      case UINT_32:
+        return intType(32, false);
+      case UINT_64:
+        return intType(64, false);
+      case INT_8:
+        return intType(8, true);
+      case INT_16:
+        return intType(16, true);
+      case INT_32:
+        return intType(32, true);
+      case INT_64:
+        return intType(64, true);
+      case ENUM:
+        return enumType();
+      case JSON:
+        return jsonType();
+      case BSON:
+        return bsonType();
+      case MAP_KEY_VALUE:
+        return MapKeyValueTypeAnnotation.getInstance();
+      default:
+        throw new RuntimeException("Can't convert original type to logical 
type, unknown original type " + originalType);
+    }
+  }
+
+  public static StringLogicalTypeAnnotation stringType() {
+    return StringLogicalTypeAnnotation.INSTANCE;
+  }
+
+  public static MapLogicalTypeAnnotation mapType() {
+    return MapLogicalTypeAnnotation.INSTANCE;
+  }
+
+  public static ListLogicalTypeAnnotation listType() {
+    return ListLogicalTypeAnnotation.INSTANCE;
+  }
+
+  public static EnumLogicalTypeAnnotation enumType() {
+    return EnumLogicalTypeAnnotation.INSTANCE;
+  }
+
+  public static DecimalLogicalTypeAnnotation decimalType(final int scale, 
final int precision) {
+    return new DecimalLogicalTypeAnnotation(scale, precision);
+  }
+
+  public static DateLogicalTypeAnnotation dateType() {
+    return DateLogicalTypeAnnotation.INSTANCE;
+  }
+
+  public static TimeLogicalTypeAnnotation timeType(final boolean 
isAdjustedToUTC, final TimeUnit unit) {
+    return new TimeLogicalTypeAnnotation(isAdjustedToUTC, unit);
+  }
+
+  public static TimestampLogicalTypeAnnotation timestampType(final boolean 
isAdjustedToUTC, final TimeUnit unit) {
+    return new TimestampLogicalTypeAnnotation(isAdjustedToUTC, unit);
+  }
+
+  public static IntLogicalTypeAnnotation intType(final int bitWidth, final 
boolean isSigned) {
+    Preconditions.checkArgument(
+      bitWidth == 8 || bitWidth == 16 || bitWidth == 32 || bitWidth == 64,
+      "Invalid bit width for integer logical type, " + bitWidth + " is not 
allowed, " +
+        "valid bit width values: 8, 16, 32, 64");
+    return new IntLogicalTypeAnnotation(bitWidth, isSigned);
+  }
+
+  public static JsonLogicalTypeAnnotation jsonType() {
+    return JsonLogicalTypeAnnotation.INSTANCE;
+  }
+
+  public static BsonLogicalTypeAnnotation bsonType() {
+    return BsonLogicalTypeAnnotation.INSTANCE;
+  }
+
+  public static class StringLogicalTypeAnnotation extends 
LogicalTypeAnnotation {
+    private static final StringLogicalTypeAnnotation INSTANCE = new 
StringLogicalTypeAnnotation();
+
+    private StringLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.UTF8;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor 
logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.UTF8;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof StringLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  public static class MapLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static final MapLogicalTypeAnnotation INSTANCE = new 
MapLogicalTypeAnnotation();
+
+    private MapLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.MAP;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor 
logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.MAP;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof MapLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  public static class ListLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static final ListLogicalTypeAnnotation INSTANCE = new 
ListLogicalTypeAnnotation();
+
+    private ListLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.LIST;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor 
logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.LIST;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof ListLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  public static class EnumLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static final EnumLogicalTypeAnnotation INSTANCE = new 
EnumLogicalTypeAnnotation();
+
+    private EnumLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.ENUM;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor 
logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.ENUM;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof EnumLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  public static class DecimalLogicalTypeAnnotation extends 
LogicalTypeAnnotation {
+    private final int scale;
+    private final int precision;
+
+    private DecimalLogicalTypeAnnotation(int scale, int precision) {
+      this.scale = scale;
+      this.precision = precision;
+    }
+
+    public int getPrecision() {
+      return precision;
+    }
+
+    public int getScale() {
+      return scale;
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.DECIMAL;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor 
logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.DECIMAL;
+    }
+
+    @Override
+    protected String typeParametersAsString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("(");
+      sb.append(precision);
+      sb.append(",");
+      sb.append(scale);
+      sb.append(")");
+      return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof DecimalLogicalTypeAnnotation)) {
+        return false;
+      }
+      DecimalLogicalTypeAnnotation other = (DecimalLogicalTypeAnnotation) obj;
+      return scale == other.scale && precision == other.precision;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(scale, precision);
+    }
+  }
+
+  public static class DateLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static final DateLogicalTypeAnnotation INSTANCE = new 
DateLogicalTypeAnnotation();
+
+    private DateLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.DATE;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor 
logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.DATE;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof DateLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  public enum TimeUnit {
+    MILLIS,
+    MICROS
+  }
+
+  public static class TimeLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private final boolean isAdjustedToUTC;
+    private final TimeUnit unit;
+
+    private TimeLogicalTypeAnnotation(boolean isAdjustedToUTC, TimeUnit unit) {
+      this.isAdjustedToUTC = isAdjustedToUTC;
+      this.unit = unit;
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      switch (unit) {
+        case MILLIS:
+          return OriginalType.TIME_MILLIS;
+        case MICROS:
+          return OriginalType.TIME_MICROS;
+        default:
+          throw new RuntimeException("Unknown original type for " + unit);
+      }
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor 
logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.TIME;
+    }
+
+    @Override
+    protected String typeParametersAsString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("(");
+      sb.append(unit);
+      sb.append(",");
+      sb.append(isAdjustedToUTC);
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public TimeUnit getUnit() {
+      return unit;
+    }
+
+    public boolean isAdjustedToUTC() {
+      return isAdjustedToUTC;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof TimeLogicalTypeAnnotation)) {
+        return false;
+      }
+      TimeLogicalTypeAnnotation other = (TimeLogicalTypeAnnotation) obj;
+      return isAdjustedToUTC == other.isAdjustedToUTC && unit == other.unit;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(isAdjustedToUTC, unit);
+    }
+  }
+
+  public static class TimestampLogicalTypeAnnotation extends 
LogicalTypeAnnotation {
+    private final boolean isAdjustedToUTC;
+    private final TimeUnit unit;
+
+    private TimestampLogicalTypeAnnotation(boolean isAdjustedToUTC, TimeUnit 
unit) {
+      this.isAdjustedToUTC = isAdjustedToUTC;
+      this.unit = unit;
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      switch (unit) {
+        case MILLIS:
+          return OriginalType.TIMESTAMP_MILLIS;
+        case MICROS:
+          return OriginalType.TIMESTAMP_MICROS;
+        default:
+          throw new RuntimeException("Unknown original type for " + unit);
+      }
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor 
logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.TIMESTAMP;
+    }
+
+    @Override
+    protected String typeParametersAsString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("(");
+      sb.append(unit);
+      sb.append(",");
+      sb.append(isAdjustedToUTC);
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public TimeUnit getUnit() {
+      return unit;
+    }
+
+    public boolean isAdjustedToUTC() {
+      return isAdjustedToUTC;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof TimestampLogicalTypeAnnotation)) {
+        return false;
+      }
+      TimestampLogicalTypeAnnotation other = (TimestampLogicalTypeAnnotation) 
obj;
+      return (isAdjustedToUTC == other.isAdjustedToUTC) && (unit == 
other.unit);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(isAdjustedToUTC, unit);
+    }
+  }
+
+  public static class IntLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private final int bitWidth;
+    private final boolean isSigned;
+
+
+    private IntLogicalTypeAnnotation(int bitWidth, boolean isSigned) {
+      this.bitWidth = bitWidth;
+      this.isSigned = isSigned;
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      switch (bitWidth) {
+        case 8:
+          return isSigned ? OriginalType.INT_8 : OriginalType.UINT_8;
+        case 16:
+          return isSigned ? OriginalType.INT_16 : OriginalType.UINT_16;
+        case 32:
+          return isSigned ? OriginalType.INT_32 : OriginalType.UINT_32;
+        case 64:
+          return isSigned ? OriginalType.INT_64 : OriginalType.UINT_64;
+        default:
+          throw new RuntimeException("Unknown original type " + 
toOriginalType());
+      }
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor 
logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.INT;
+    }
+
+    @Override
+    protected String typeParametersAsString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("(");
+      sb.append(bitWidth);
+      sb.append(",");
+      sb.append(isSigned);
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public int getBitWidth() {
+      return bitWidth;
+    }
+
+    public boolean isSigned() {
+      return isSigned;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof IntLogicalTypeAnnotation)) {
+        return false;
+      }
+      IntLogicalTypeAnnotation other = (IntLogicalTypeAnnotation) obj;
+      return (bitWidth == other.bitWidth) && (isSigned == other.isSigned);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(bitWidth, isSigned);
+    }
+  }
+
+  public static class JsonLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static final JsonLogicalTypeAnnotation INSTANCE = new 
JsonLogicalTypeAnnotation();
+
+    private JsonLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.JSON;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor 
logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.JSON;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof JsonLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  public static class BsonLogicalTypeAnnotation extends LogicalTypeAnnotation {
+    private static final BsonLogicalTypeAnnotation INSTANCE = new 
BsonLogicalTypeAnnotation();
+
+    private BsonLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.BSON;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor 
logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.BSON;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof BsonLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  // This logical type annotation is implemented to support backward 
compatibility with ConvertedType.
+  // The new logical type representation in parquet-format doesn't have any 
interval type,
+  // thus this annotation is mapped to UNKNOWN.
+  public static class IntervalLogicalTypeAnnotation extends 
LogicalTypeAnnotation {
+    private static IntervalLogicalTypeAnnotation INSTANCE = new 
IntervalLogicalTypeAnnotation();
+
+    public static LogicalTypeAnnotation getInstance() {
+      return INSTANCE;
+    }
+
+    private IntervalLogicalTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.INTERVAL;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor 
logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.INTERVAL;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof IntervalLogicalTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  // This logical type annotation is implemented to support backward 
compatibility with ConvertedType.
+  // The new logical type representation in parquet-format doesn't have any 
key-value type,
+  // thus this annotation is mapped to UNKNOWN. This type shouldn't be used.
+  public static class MapKeyValueTypeAnnotation extends LogicalTypeAnnotation {
+    private static MapKeyValueTypeAnnotation INSTANCE = new 
MapKeyValueTypeAnnotation();
+
+    public static MapKeyValueTypeAnnotation getInstance() {
+      return INSTANCE;
+    }
+
+    private MapKeyValueTypeAnnotation() {
+    }
+
+    @Override
+    public OriginalType toOriginalType() {
+      return OriginalType.MAP_KEY_VALUE;
+    }
+
+    @Override
+    public void accept(LogicalTypeAnnotationVisitor 
logicalTypeAnnotationVisitor) {
+      logicalTypeAnnotationVisitor.visit(this);
+    }
+
+    @Override
+    LogicalTypeToken getType() {
+      return LogicalTypeToken.MAP_KEY_VALUE;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      return obj instanceof MapKeyValueTypeAnnotation;
+    }
+
+    @Override
+    public int hashCode() {
+      // This type doesn't have any parameters, thus using class hashcode
+      return getClass().hashCode();
+    }
+  }
+
+  /**
+   * Implement this interface to visit a logical type annotation in the schema.
+   * The default implementation for each logical type specific visitor method 
is empty.
+   * <p>
+   * Example usage: logicalTypeAnnotation.accept(new 
LogicalTypeAnnotationVisitor() { ... });
+   */
+  public interface LogicalTypeAnnotationVisitor {
+    default void visit(StringLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(MapLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(ListLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(EnumLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(DecimalLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(DateLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(TimeLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(TimestampLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(IntLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(JsonLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(BsonLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(IntervalLogicalTypeAnnotation logicalTypeAnnotation) {
+    }
+
+    default void visit(MapKeyValueTypeAnnotation logicalTypeAnnotation) {
+    }
+  }
+}
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/schema/MessageTypeParser.java 
b/parquet-column/src/main/java/org/apache/parquet/schema/MessageTypeParser.java
index f0c178af6..a3051eae3 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/schema/MessageTypeParser.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/schema/MessageTypeParser.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,7 +18,9 @@
  */
 package org.apache.parquet.schema;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Locale;
 import java.util.StringTokenizer;
 
@@ -161,25 +163,44 @@ private static void addPrimitiveType(Tokenizer st, 
PrimitiveTypeName type, Repet
     t = st.nextToken();
     OriginalType originalType = null;
     if (t.equalsIgnoreCase("(")) {
-      originalType = OriginalType.valueOf(st.nextToken());
-      childBuilder.as(originalType);
-      if (OriginalType.DECIMAL == originalType) {
+      t = st.nextToken();
+      if (isLogicalType(t)) {
+        LogicalTypeAnnotation.LogicalTypeToken logicalType = 
LogicalTypeAnnotation.LogicalTypeToken.valueOf(t);
         t = st.nextToken();
-        // parse precision and scale
-        if (t.equalsIgnoreCase("(")) {
-          childBuilder.precision(Integer.parseInt(st.nextToken()));
-          t = st.nextToken();
-          if (t.equalsIgnoreCase(",")) {
-            childBuilder.scale(Integer.parseInt(st.nextToken()));
+        List<String> tokens = new ArrayList<>();
+        if ("(".equals(t)) {
+          while (!")".equals(t)) {
+            if (!(",".equals(t) || "(".equals(t) || ")".equals(t))) {
+              tokens.add(t);
+            }
             t = st.nextToken();
           }
-          check(t, ")", "decimal type ended by )", st);
           t = st.nextToken();
         }
+        LogicalTypeAnnotation logicalTypeAnnotation = 
logicalType.fromString(tokens);
+        childBuilder.as(logicalTypeAnnotation);
       } else {
-        t = st.nextToken();
+        // Try to parse as old logical type, called OriginalType
+        originalType = OriginalType.valueOf(t);
+        childBuilder.as(originalType);
+        if (OriginalType.DECIMAL == originalType) {
+          t = st.nextToken();
+          // parse precision and scale
+          if (t.equalsIgnoreCase("(")) {
+            childBuilder.precision(Integer.parseInt(st.nextToken()));
+            t = st.nextToken();
+            if (t.equalsIgnoreCase(",")) {
+              childBuilder.scale(Integer.parseInt(st.nextToken()));
+              t = st.nextToken();
+            }
+            check(t, ")", "decimal type ended by )", st);
+            t = st.nextToken();
+          }
+        } else {
+          t = st.nextToken();
+        }
       }
-      check(t, ")", "original type ended by )", st);
+      check(t, ")", "logical type ended by )", st);
       t = st.nextToken();
     }
     if (t.equals("=")) {
@@ -195,6 +216,10 @@ private static void addPrimitiveType(Tokenizer st, 
PrimitiveTypeName type, Repet
     }
   }
 
+  private static boolean isLogicalType(String t) {
+    return 
Arrays.stream(LogicalTypeAnnotation.LogicalTypeToken.values()).anyMatch((type) 
-> type.name().equals(t));
+  }
+
   private static PrimitiveTypeName asPrimitive(String t, Tokenizer st) {
     try {
       return PrimitiveTypeName.valueOf(t.toUpperCase(Locale.ENGLISH));
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java 
b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
index a4211738b..369b277b8 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
@@ -389,9 +389,8 @@ abstract public void addValueToPrimitiveConverter(
    * @param primitive STRING, INT64, ...
    * @param name the name of the type
    */
-  public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
-                       String name) {
-    this(repetition, primitive, 0, name, null, null, null);
+  public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive, 
String name) {
+    this(repetition, primitive, 0, name, (LogicalTypeAnnotation) null, null, 
null);
   }
 
   /**
@@ -401,7 +400,7 @@ public PrimitiveType(Repetition repetition, 
PrimitiveTypeName primitive,
    * @param name the name of the type
    */
   public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive, int 
length, String name) {
-    this(repetition, primitive, length, name, null, null, null);
+    this(repetition, primitive, length, name, (LogicalTypeAnnotation) null, 
null, null);
   }
 
   /**
@@ -409,7 +408,10 @@ public PrimitiveType(Repetition repetition, 
PrimitiveTypeName primitive, int len
    * @param primitive STRING, INT64, ...
    * @param name the name of the type
    * @param originalType (optional) the original type to help with cross 
schema convertion (LIST, MAP, ...)
+   *
+   * @deprecated will be removed in 2.0.0; use builders in {@link Types} 
instead
    */
+  @Deprecated
   public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
                        String name, OriginalType originalType) {
     this(repetition, primitive, 0, name, originalType, null, null);
@@ -436,7 +438,10 @@ public PrimitiveType(Repetition repetition, 
PrimitiveTypeName primitive,
    * @param originalType (optional) the original type (MAP, DECIMAL, UTF8, ...)
    * @param decimalMeta (optional) metadata about the decimal type
    * @param id the id of the field
+   *
+   * @deprecated will be removed in 2.0.0; use builders in {@link Types} 
instead
    */
+  @Deprecated
   public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
                        int length, String name, OriginalType originalType,
                        DecimalMetadata decimalMeta, ID id) {
@@ -446,7 +451,7 @@ public PrimitiveType(Repetition repetition, 
PrimitiveTypeName primitive,
   PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
       int length, String name, OriginalType originalType,
       DecimalMetadata decimalMeta, ID id, ColumnOrder columnOrder) {
-    super(name, repetition, originalType, id);
+    super(name, repetition, originalType, decimalMeta, id);
     this.primitive = primitive;
     this.length = length;
     this.decimalMeta = decimalMeta;
@@ -459,6 +464,37 @@ public PrimitiveType(Repetition repetition, 
PrimitiveTypeName primitive,
     this.columnOrder = requireValidColumnOrder(columnOrder);
   }
 
+  PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
+                       String name, LogicalTypeAnnotation 
logicalTypeAnnotation) {
+    this(repetition, primitive, 0, name, logicalTypeAnnotation, null, null);
+  }
+
+  PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
+                       int length, String name, LogicalTypeAnnotation 
logicalTypeAnnotation, ID id) {
+    this(repetition, primitive, length, name, logicalTypeAnnotation, id, null);
+  }
+
+  PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
+                int length, String name, LogicalTypeAnnotation 
logicalTypeAnnotation,
+                ID id, ColumnOrder columnOrder) {
+    super(name, repetition, logicalTypeAnnotation, id);
+    this.primitive = primitive;
+    this.length = length;
+    if (getOriginalType() == OriginalType.DECIMAL) {
+      LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = 
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
+      this.decimalMeta = new DecimalMetadata(decimal.getPrecision(), 
decimal.getScale());
+    } else {
+      this.decimalMeta = null;
+    }
+
+    if (columnOrder == null) {
+      columnOrder = primitive == PrimitiveTypeName.INT96 || getOriginalType() 
== OriginalType.INTERVAL
+        ? ColumnOrder.undefined()
+        : ColumnOrder.typeDefined();
+    }
+    this.columnOrder = requireValidColumnOrder(columnOrder);
+  }
+
   private ColumnOrder requireValidColumnOrder(ColumnOrder columnOrder) {
     if (primitive == PrimitiveTypeName.INT96) {
       Preconditions.checkArgument(columnOrder.getColumnOrderName() == 
ColumnOrderName.UNDEFINED,
@@ -557,17 +593,9 @@ public void writeToStringBuilder(StringBuilder sb, String 
indent) {
       sb.append("(" + length + ")");
     }
     sb.append(" ").append(getName());
-    if (getOriginalType() != null) {
-      sb.append(" (").append(getOriginalType());
-      DecimalMetadata meta = getDecimalMetadata();
-      if (meta != null) {
-        sb.append("(")
-            .append(meta.getPrecision())
-            .append(",")
-            .append(meta.getScale())
-            .append(")");
-      }
-      sb.append(")");
+    if (getLogicalTypeAnnotation() != null) {
+      // TODO: should we print decimal metadata too?
+      sb.append(" 
(").append(getLogicalTypeAnnotation().toString()).append(")");
     }
     if (getId() != null) {
       sb.append(" = ").append(getId());
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Type.java 
b/parquet-column/src/main/java/org/apache/parquet/schema/Type.java
index dd2c38da3..69374a465 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/Type.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/Type.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -119,7 +119,7 @@ public boolean isMoreRestrictiveThan(Repetition other) {
 
   private final String name;
   private final Repetition repetition;
-  private final OriginalType originalType;
+  private final LogicalTypeAnnotation logicalTypeAnnotation;
   private final ID id;
 
   /**
@@ -128,7 +128,7 @@ public boolean isMoreRestrictiveThan(Repetition other) {
    */
   @Deprecated
   public Type(String name, Repetition repetition) {
-    this(name, repetition, null, null);
+    this(name, repetition, (LogicalTypeAnnotation) null, null);
   }
 
   /**
@@ -148,10 +148,26 @@ public Type(String name, Repetition repetition, 
OriginalType originalType) {
    * @param id (optional) the id of the fields.
    */
   Type(String name, Repetition repetition, OriginalType originalType, ID id) {
+    this(name, repetition, originalType, null, id);
+  }
+
+  Type(String name, Repetition repetition, OriginalType originalType, 
DecimalMetadata decimalMetadata, ID id) {
+    super();
+    this.name = checkNotNull(name, "name");
+    this.repetition = checkNotNull(repetition, "repetition");
+    this.logicalTypeAnnotation = originalType == null ? null : 
LogicalTypeAnnotation.fromOriginalType(originalType, decimalMetadata);
+    this.id = id;
+  }
+
+  Type(String name, Repetition repetition, LogicalTypeAnnotation 
logicalTypeAnnotation) {
+    this(name, repetition, logicalTypeAnnotation, null);
+  }
+
+  Type(String name, Repetition repetition, LogicalTypeAnnotation 
logicalTypeAnnotation, ID id) {
     super();
     this.name = checkNotNull(name, "name");
     this.repetition = checkNotNull(repetition, "repetition");
-    this.originalType = originalType;
+    this.logicalTypeAnnotation = logicalTypeAnnotation;
     this.id = id;
   }
 
@@ -190,11 +206,15 @@ public ID getId() {
     return id;
   }
 
+  public LogicalTypeAnnotation getLogicalTypeAnnotation() {
+    return logicalTypeAnnotation;
+  }
+
   /**
    * @return the original type (LIST, MAP, ...)
    */
   public OriginalType getOriginalType() {
-    return originalType;
+    return logicalTypeAnnotation == null ? null : 
logicalTypeAnnotation.toOriginalType();
   }
 
   /**
@@ -247,8 +267,8 @@ public PrimitiveType asPrimitiveType() {
   public int hashCode() {
     int c = repetition.hashCode();
     c = 31 * c + name.hashCode();
-    if (originalType != null) {
-      c = 31 * c +  originalType.hashCode();
+    if (logicalTypeAnnotation != null) {
+      c = 31 * c +  logicalTypeAnnotation.hashCode();
     }
     if (id != null) {
       c = 31 * c + id.hashCode();
@@ -262,7 +282,7 @@ protected boolean equals(Type other) {
         && repetition == other.repetition
         && eqOrBothNull(repetition, other.repetition)
         && eqOrBothNull(id, other.id)
-        && eqOrBothNull(originalType, other.originalType);
+        && eqOrBothNull(logicalTypeAnnotation, other.logicalTypeAnnotation);
   };
 
   @Override
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java 
b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
index 0422a9d43..916fb2999 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/Types.java
@@ -198,7 +198,7 @@
     protected final Class<? extends P> returnClass;
 
     protected Type.Repetition repetition = null;
-    protected OriginalType originalType = null;
+    protected LogicalTypeAnnotation logicalTypeAnnotation = null;
     protected Type.ID id = null;
     private boolean repetitionAlreadySet = false;
 
@@ -250,9 +250,32 @@ protected final THIS repetition(Type.Repetition 
repetition) {
      *
      * @param type an {@code OriginalType}
      * @return this builder for method chaining
+     *
+     * @deprecated use {@link #as(LogicalTypeAnnotation)} with the 
corresponding logical type instead
      */
+    @Deprecated
     public THIS as(OriginalType type) {
-      this.originalType = type;
+      this.logicalTypeAnnotation = 
LogicalTypeAnnotation.fromOriginalType(type, null);
+      return self();
+    }
+
+    protected boolean newLogicalTypeSet;
+
+    /**
+     * Adds a type annotation ({@link LogicalTypeAnnotation}) to the type 
being built.
+     * <p>
+     * Type annotations are used to extend the types that parquet can store, by
+     * specifying how the primitive types should be interpreted. This keeps the
+     * set of primitive types to a minimum and reuses parquet's efficient
+     * encodings. For example, strings are stored as byte arrays (binary) with
+     * a UTF8 annotation.
+     *
+     * @param type an {@code {@link LogicalTypeAnnotation}}
+     * @return this builder for method chaining
+     */
+    public THIS as(LogicalTypeAnnotation type) {
+      this.logicalTypeAnnotation = type;
+      this.newLogicalTypeSet = true;
       return self();
     }
 
@@ -303,6 +326,9 @@ public P named(String name) {
       }
     }
 
+    protected OriginalType getOriginalType () {
+      return logicalTypeAnnotation == null ? null : 
logicalTypeAnnotation.toOriginalType();
+    }
   }
 
   public abstract static class
@@ -343,6 +369,9 @@ public THIS length(int length) {
       return self();
     }
 
+    private boolean precisionAlreadySet;
+    private boolean scaleAlreadySet;
+
     /**
      * Adds the precision for a DECIMAL.
      * <p>
@@ -352,9 +381,13 @@ public THIS length(int length) {
      *
      * @param precision an int precision value for the DECIMAL
      * @return this builder for method chaining
+     *
+     * @deprecated use {@link #as(LogicalTypeAnnotation)} with the 
corresponding decimal type instead
      */
+    @Deprecated
     public THIS precision(int precision) {
       this.precision = precision;
+      precisionAlreadySet = true;
       return self();
     }
 
@@ -370,9 +403,13 @@ public THIS precision(int precision) {
      *
      * @param scale an int scale value for the DECIMAL
      * @return this builder for method chaining
+     *
+     * @deprecated use {@link #as(LogicalTypeAnnotation)} with the 
corresponding decimal type instead
      */
+    @Deprecated
     public THIS scale(int scale) {
       this.scale = scale;
+      scaleAlreadySet = true;
       return self();
     }
 
@@ -402,7 +439,8 @@ protected PrimitiveType build(String name) {
       DecimalMetadata meta = decimalMetadata();
 
       // validate type annotations and required metadata
-      if (originalType != null) {
+      if (logicalTypeAnnotation != null) {
+        OriginalType originalType = logicalTypeAnnotation.toOriginalType();
         switch (originalType) {
           case UTF8:
           case JSON:
@@ -475,7 +513,7 @@ protected PrimitiveType build(String name) {
         }
       }
 
-      return new PrimitiveType(repetition, primitiveType, length, name, 
originalType, meta, id, columnOrder);
+      return new PrimitiveType(repetition, primitiveType, length, name, 
getOriginalType(), meta, id, columnOrder);
     }
 
     private static long maxPrecision(int numBytes) {
@@ -488,12 +526,25 @@ private static long maxPrecision(int numBytes) {
 
     protected DecimalMetadata decimalMetadata() {
       DecimalMetadata meta = null;
-      if (OriginalType.DECIMAL == originalType) {
+      if (OriginalType.DECIMAL == getOriginalType()) {
+        LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimalType = 
(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalTypeAnnotation;
+        if (newLogicalTypeSet) {
+          if (scaleAlreadySet) {
+            Preconditions.checkArgument(this.scale == decimalType.getScale(),
+              "Decimal scale should match with the scale of the logical type");
+          }
+          if (precisionAlreadySet) {
+            Preconditions.checkArgument(this.precision == 
decimalType.getPrecision(),
+              "Decimal precision should match with the precision of the 
logical type");
+          }
+          scale = decimalType.getScale();
+          precision = decimalType.getPrecision();
+        }
         Preconditions.checkArgument(precision > 0,
             "Invalid DECIMAL precision: " + precision);
-        Preconditions.checkArgument(scale >= 0,
-            "Invalid DECIMAL scale: " + scale);
-        Preconditions.checkArgument(scale <= precision,
+        Preconditions.checkArgument(this.scale >= 0,
+            "Invalid DECIMAL scale: " + this.scale);
+        Preconditions.checkArgument(this.scale <= precision,
             "Invalid DECIMAL scale: cannot be greater than precision");
         meta = new DecimalMetadata(precision, scale);
       }
@@ -648,7 +699,7 @@ public THIS addFields(Type... types) {
 
     @Override
     protected GroupType build(String name) {
-      return new GroupType(repetition, name, originalType, fields, id);
+      return new GroupType(repetition, name, getOriginalType(), fields, id);
     }
 
     public MapBuilder<THIS> map(
@@ -1043,7 +1094,7 @@ public THIS value(Type type) {
 
     @Override
     protected Type build(String name) {
-      Preconditions.checkState(originalType == null,
+      Preconditions.checkState(logicalTypeAnnotation == null,
           "MAP is already a logical type and can't be changed.");
       if (keyType == null) {
         keyType = STRING_KEY;
@@ -1191,7 +1242,7 @@ public LP named(String name) {
 
     @Override
     protected Type build(String name) {
-      Preconditions.checkState(originalType == null,
+      Preconditions.checkState(logicalTypeAnnotation == null,
           "LIST is already the logical type and can't be changed");
       Preconditions.checkNotNull(elementType, "List element type");
 
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/parser/TestParquetParser.java 
b/parquet-column/src/test/java/org/apache/parquet/parser/TestParquetParser.java
index e2f737abf..5082501af 100644
--- 
a/parquet-column/src/test/java/org/apache/parquet/parser/TestParquetParser.java
+++ 
b/parquet-column/src/test/java/org/apache/parquet/parser/TestParquetParser.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,6 +18,10 @@
  */
 package org.apache.parquet.parser;
 
+import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MILLIS;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.intType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timeType;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.timestampType;
 import static org.junit.Assert.assertEquals;
 import static org.apache.parquet.schema.MessageTypeParser.parseMessageType;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
@@ -248,6 +252,8 @@ public void testTimeAnnotations() {
         "  required int32 time (TIME_MILLIS);" +
         "  required int64 timestamp (TIMESTAMP_MILLIS);" +
         "  required FIXED_LEN_BYTE_ARRAY(12) interval (INTERVAL);" +
+        "  required int32 newTime (TIME(MILLIS,true));" +
+        "  required int64 newTimestamp (TIMESTAMP(MILLIS,false));" +
         "}\n";
 
     MessageType parsed = MessageTypeParser.parseMessageType(message);
@@ -256,7 +262,9 @@ public void testTimeAnnotations() {
         .required(INT32).as(TIME_MILLIS).named("time")
         .required(INT64).as(TIMESTAMP_MILLIS).named("timestamp")
         
.required(FIXED_LEN_BYTE_ARRAY).length(12).as(INTERVAL).named("interval")
-        .named("TimeMessage");
+        .required(INT32).as(timeType(true, MILLIS)).named("newTime")
+        .required(INT64).as(timestampType(false, MILLIS)).named("newTimestamp")
+      .named("TimeMessage");
 
     assertEquals(expected, parsed);
     MessageType reparsed = 
MessageTypeParser.parseMessageType(parsed.toString());
@@ -293,6 +301,36 @@ public void testIntAnnotations() {
     assertEquals(expected, reparsed);
   }
 
+  @Test
+  public void testIntegerAnnotations() {
+    String message = "message IntMessage {" +
+      "  required int32 i8 (INT(8,true));" +
+      "  required int32 i16 (INT(16,true));" +
+      "  required int32 i32 (INT(32,true));" +
+      "  required int64 i64 (INT(64,true));" +
+      "  required int32 u8 (INT(8,false));" +
+      "  required int32 u16 (INT(16,false));" +
+      "  required int32 u32 (INT(32,false));" +
+      "  required int64 u64 (INT(64,false));" +
+      "}\n";
+
+    MessageType parsed = MessageTypeParser.parseMessageType(message);
+    MessageType expected = Types.buildMessage()
+      .required(INT32).as(intType(8, true)).named("i8")
+      .required(INT32).as(intType(16, true)).named("i16")
+      .required(INT32).as(intType(32, true)).named("i32")
+      .required(INT64).as(intType(64, true)).named("i64")
+      .required(INT32).as(intType(8, false)).named("u8")
+      .required(INT32).as(intType(16, false)).named("u16")
+      .required(INT32).as(intType(32, false)).named("u32")
+      .required(INT64).as(intType(64, false)).named("u64")
+      .named("IntMessage");
+
+    assertEquals(expected, parsed);
+    MessageType reparsed = 
MessageTypeParser.parseMessageType(parsed.toString());
+    assertEquals(expected, reparsed);
+  }
+
   @Test
   public void testEmbeddedAnnotations() {
     String message = "message EmbeddedMessage {" +
diff --git 
a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java 
b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
index 0b1f41a59..a42e9e33b 100644
--- 
a/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
+++ 
b/parquet-column/src/test/java/org/apache/parquet/schema/TestTypeBuilders.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -1396,6 +1396,47 @@ public PrimitiveType call() {
     });
   }
 
+  @Test
+  public void testDecimalLogicalType() {
+    PrimitiveType expected = new PrimitiveType(REQUIRED, BINARY, "aDecimal",
+      LogicalTypeAnnotation.decimalType(3, 4));
+    PrimitiveType actual = Types.required(BINARY)
+      .as(LogicalTypeAnnotation.decimalType(3, 4)).named("aDecimal");
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testDecimalLogicalTypeWithDeprecatedScale() {
+    PrimitiveType expected = new PrimitiveType(REQUIRED, BINARY, "aDecimal",
+      LogicalTypeAnnotation.decimalType(3, 4));
+    PrimitiveType actual = Types.required(BINARY)
+      .as(LogicalTypeAnnotation.decimalType(3, 4)).scale(3).named("aDecimal");
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testDecimalLogicalTypeWithDeprecatedPrecision() {
+    PrimitiveType expected = new PrimitiveType(REQUIRED, BINARY, "aDecimal",
+      LogicalTypeAnnotation.decimalType(3, 4));
+    PrimitiveType actual = Types.required(BINARY)
+      .as(LogicalTypeAnnotation.decimalType(3, 
4)).precision(4).named("aDecimal");
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDecimalLogicalTypeWithDeprecatedScaleMismatch() {
+    Types.required(BINARY)
+      .as(LogicalTypeAnnotation.decimalType(3, 4))
+      .scale(4).named("aDecimal");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDecimalLogicalTypeWithDeprecatedPrecisionMismatch() {
+    Types.required(BINARY)
+      .as(LogicalTypeAnnotation.decimalType(3, 4))
+      .precision(5).named("aDecimal");
+  }
+
   /**
    * A convenience method to avoid a large number of @Test(expected=...) tests
    * @param message A String message to describe this assertion
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index c4e5da3da..b040d27fd 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -39,8 +39,24 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.CorruptStatistics;
 import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.format.BsonType;
 import org.apache.parquet.format.CompressionCodec;
+import org.apache.parquet.format.DateType;
+import org.apache.parquet.format.DecimalType;
+import org.apache.parquet.format.EnumType;
+import org.apache.parquet.format.IntType;
+import org.apache.parquet.format.JsonType;
+import org.apache.parquet.format.ListType;
+import org.apache.parquet.format.LogicalType;
+import org.apache.parquet.format.MapType;
+import org.apache.parquet.format.MicroSeconds;
+import org.apache.parquet.format.MilliSeconds;
+import org.apache.parquet.format.NullType;
 import org.apache.parquet.format.PageEncodingStats;
+import org.apache.parquet.format.StringType;
+import org.apache.parquet.format.TimeType;
+import org.apache.parquet.format.TimeUnit;
+import org.apache.parquet.format.TimestampType;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
 import org.apache.parquet.format.ColumnChunk;
 import org.apache.parquet.format.ColumnMetaData;
@@ -75,6 +91,7 @@
 import org.apache.parquet.schema.Type.Repetition;
 import org.apache.parquet.schema.TypeVisitor;
 import org.apache.parquet.schema.Types;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -171,8 +188,9 @@ public void visit(PrimitiveType primitiveType) {
         SchemaElement element = new SchemaElement(primitiveType.getName());
         
element.setRepetition_type(toParquetRepetition(primitiveType.getRepetition()));
         element.setType(getType(primitiveType.getPrimitiveTypeName()));
-        if (primitiveType.getOriginalType() != null) {
-          
element.setConverted_type(getConvertedType(primitiveType.getOriginalType()));
+        if (primitiveType.getLogicalTypeAnnotation() != null) {
+          
element.setConverted_type(convertToConvertedType(primitiveType.getLogicalTypeAnnotation()));
+          
element.setLogicalType(convertToLogicalType(primitiveType.getLogicalTypeAnnotation()));
         }
         if (primitiveType.getDecimalMetadata() != null) {
           
element.setPrecision(primitiveType.getDecimalMetadata().getPrecision());
@@ -200,8 +218,9 @@ public void visit(MessageType messageType) {
       public void visit(GroupType groupType) {
         SchemaElement element = new SchemaElement(groupType.getName());
         
element.setRepetition_type(toParquetRepetition(groupType.getRepetition()));
-        if (groupType.getOriginalType() != null) {
-          
element.setConverted_type(getConvertedType(groupType.getOriginalType()));
+        if (groupType.getLogicalTypeAnnotation() != null) {
+          
element.setConverted_type(convertToConvertedType(groupType.getLogicalTypeAnnotation()));
+          
element.setLogicalType(convertToLogicalType(groupType.getLogicalTypeAnnotation()));
         }
         if (groupType.getId() != null) {
           element.setField_id(groupType.getId().intValue());
@@ -220,6 +239,158 @@ private void visitChildren(final List<SchemaElement> 
result,
     });
   }
 
+  LogicalType convertToLogicalType(LogicalTypeAnnotation 
logicalTypeAnnotation) {
+    LogicalTypeConverterVisitor logicalTypeConverterVisitor = new 
LogicalTypeConverterVisitor();
+    logicalTypeAnnotation.accept(logicalTypeConverterVisitor);
+    return logicalTypeConverterVisitor.logicalType;
+  }
+
+  ConvertedType convertToConvertedType(LogicalTypeAnnotation 
logicalTypeAnnotation) {
+    LogicalTypeConverterVisitor logicalTypeConverterVisitor = new 
LogicalTypeConverterVisitor();
+    logicalTypeAnnotation.accept(logicalTypeConverterVisitor);
+    return logicalTypeConverterVisitor.convertedType;
+  }
+
+
+  static org.apache.parquet.format.TimeUnit 
convertUnit(LogicalTypeAnnotation.TimeUnit unit) {
+    switch (unit) {
+      case MICROS:
+        return org.apache.parquet.format.TimeUnit.MICROS(new MicroSeconds());
+      case MILLIS:
+        return org.apache.parquet.format.TimeUnit.MILLIS(new MilliSeconds());
+      default:
+        throw new RuntimeException("Unknown time unit " + unit);
+    }
+  }
+
+  private static class LogicalTypeConverterVisitor implements 
LogicalTypeAnnotation.LogicalTypeAnnotationVisitor {
+    private LogicalType logicalType;
+    private ConvertedType convertedType;
+
+    @Override
+    public void visit(LogicalTypeAnnotation.StringLogicalTypeAnnotation 
logicalTypeAnnotation) {
+      logicalType = LogicalType.STRING(new StringType());
+      convertedType = ConvertedType.UTF8;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.MapLogicalTypeAnnotation 
logicalTypeAnnotation) {
+      logicalType = LogicalType.MAP(new MapType());
+      convertedType = ConvertedType.MAP;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.ListLogicalTypeAnnotation 
logicalTypeAnnotation) {
+      logicalType = LogicalType.LIST(new ListType());
+      convertedType = ConvertedType.LIST;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.EnumLogicalTypeAnnotation 
logicalTypeAnnotation) {
+      logicalType = LogicalType.ENUM(new EnumType());
+      convertedType = ConvertedType.ENUM;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.DecimalLogicalTypeAnnotation 
logicalTypeAnnotation) {
+      logicalType = LogicalType.DECIMAL(new 
DecimalType(logicalTypeAnnotation.getScale(), 
logicalTypeAnnotation.getPrecision()));
+      convertedType = ConvertedType.DECIMAL;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.DateLogicalTypeAnnotation 
logicalTypeAnnotation) {
+      logicalType = LogicalType.DATE(new DateType());
+      convertedType = ConvertedType.DATE;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.TimeLogicalTypeAnnotation 
logicalTypeAnnotation) {
+      logicalType = LogicalType.TIME(new 
TimeType(logicalTypeAnnotation.isAdjustedToUTC(), 
convertUnit(logicalTypeAnnotation.getUnit())));
+      switch (logicalTypeAnnotation.toOriginalType()) {
+        case TIME_MILLIS:
+          convertedType = ConvertedType.TIME_MILLIS;
+          break;
+        case TIME_MICROS:
+          convertedType = ConvertedType.TIME_MICROS;
+          break;
+        default:
+          throw new RuntimeException("Unknown converted type for " + 
logicalTypeAnnotation.toOriginalType());
+      }
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation 
logicalTypeAnnotation) {
+      logicalType = LogicalType.TIMESTAMP(new 
TimestampType(logicalTypeAnnotation.isAdjustedToUTC(), 
convertUnit(logicalTypeAnnotation.getUnit())));
+      switch (logicalTypeAnnotation.toOriginalType()) {
+        case TIMESTAMP_MICROS:
+          convertedType = ConvertedType.TIMESTAMP_MICROS;
+          break;
+        case TIMESTAMP_MILLIS:
+          convertedType = ConvertedType.TIMESTAMP_MILLIS;
+          break;
+        default:
+          throw new RuntimeException("Unknown converted type for " + 
logicalTypeAnnotation.toOriginalType());
+      }
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.IntLogicalTypeAnnotation 
logicalTypeAnnotation) {
+      logicalType = LogicalType.INTEGER(new IntType((byte) 
logicalTypeAnnotation.getBitWidth(), logicalTypeAnnotation.isSigned()));
+      switch (logicalTypeAnnotation.toOriginalType()) {
+        case INT_8:
+          convertedType = ConvertedType.INT_8;
+          break;
+        case INT_16:
+          convertedType = ConvertedType.INT_16;
+          break;
+        case INT_32:
+          convertedType = ConvertedType.INT_32;
+          break;
+        case INT_64:
+          convertedType = ConvertedType.INT_64;
+          break;
+        case UINT_8:
+          convertedType = ConvertedType.UINT_8;
+          break;
+        case UINT_16:
+          convertedType = ConvertedType.UINT_16;
+          break;
+        case UINT_32:
+          convertedType = ConvertedType.UINT_32;
+          break;
+        case UINT_64:
+          convertedType = ConvertedType.UINT_64;
+          break;
+        default:
+          throw new RuntimeException("Unknown original type " + 
logicalTypeAnnotation.toOriginalType());
+      }
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.JsonLogicalTypeAnnotation 
logicalTypeAnnotation) {
+      logicalType = LogicalType.JSON(new JsonType());
+      convertedType = ConvertedType.JSON;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.BsonLogicalTypeAnnotation 
logicalTypeAnnotation) {
+      logicalType = LogicalType.BSON(new BsonType());
+      convertedType = ConvertedType.BSON;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.IntervalLogicalTypeAnnotation 
logicalTypeAnnotation) {
+      logicalType = LogicalType.UNKNOWN(new NullType());
+      convertedType = ConvertedType.INTERVAL;
+    }
+
+    @Override
+    public void visit(LogicalTypeAnnotation.MapKeyValueTypeAnnotation 
logicalTypeAnnotation) {
+      logicalType = LogicalType.UNKNOWN(new NullType());
+      convertedType = ConvertedType.MAP_KEY_VALUE;
+    }
+  }
+
   private void addRowGroup(ParquetMetadata parquetMetadata, List<RowGroup> 
rowGroups, BlockMetaData block) {
     //rowGroup.total_byte_size = ;
     List<ColumnChunkMetaData> columns = block.getColumns();
@@ -586,108 +757,104 @@ Type getType(PrimitiveTypeName type) {
   }
 
   // Visible for testing
-  OriginalType getOriginalType(ConvertedType type) {
+  LogicalTypeAnnotation getOriginalType(ConvertedType type, SchemaElement 
schemaElement) {
     switch (type) {
       case UTF8:
-        return OriginalType.UTF8;
+        return LogicalTypeAnnotation.stringType();
       case MAP:
-        return OriginalType.MAP;
+        return LogicalTypeAnnotation.mapType();
       case MAP_KEY_VALUE:
-        return OriginalType.MAP_KEY_VALUE;
+        return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance();
       case LIST:
-        return OriginalType.LIST;
+        return LogicalTypeAnnotation.listType();
       case ENUM:
-        return OriginalType.ENUM;
+        return LogicalTypeAnnotation.enumType();
       case DECIMAL:
-        return OriginalType.DECIMAL;
+        int scale = (schemaElement == null ? 0 : schemaElement.scale);
+        int precision = (schemaElement == null ? 0 : schemaElement.precision);
+        return LogicalTypeAnnotation.decimalType(scale, precision);
       case DATE:
-        return OriginalType.DATE;
+        return LogicalTypeAnnotation.dateType();
       case TIME_MILLIS:
-        return OriginalType.TIME_MILLIS;
+        return LogicalTypeAnnotation.timeType(true, 
LogicalTypeAnnotation.TimeUnit.MILLIS);
       case TIME_MICROS:
-        return OriginalType.TIME_MICROS;
+        return LogicalTypeAnnotation.timeType(true, 
LogicalTypeAnnotation.TimeUnit.MICROS);
       case TIMESTAMP_MILLIS:
-        return OriginalType.TIMESTAMP_MILLIS;
+        return LogicalTypeAnnotation.timestampType(true, 
LogicalTypeAnnotation.TimeUnit.MILLIS);
       case TIMESTAMP_MICROS:
-        return OriginalType.TIMESTAMP_MICROS;
+        return LogicalTypeAnnotation.timestampType(true, 
LogicalTypeAnnotation.TimeUnit.MICROS);
       case INTERVAL:
-        return OriginalType.INTERVAL;
+        return 
LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance();
       case INT_8:
-        return OriginalType.INT_8;
+        return LogicalTypeAnnotation.intType(8, true);
       case INT_16:
-        return OriginalType.INT_16;
+        return LogicalTypeAnnotation.intType(16, true);
       case INT_32:
-        return OriginalType.INT_32;
+        return LogicalTypeAnnotation.intType(32, true);
       case INT_64:
-        return OriginalType.INT_64;
+        return LogicalTypeAnnotation.intType(64, true);
       case UINT_8:
-        return OriginalType.UINT_8;
+        return LogicalTypeAnnotation.intType(8, false);
       case UINT_16:
-        return OriginalType.UINT_16;
+        return LogicalTypeAnnotation.intType(16, false);
       case UINT_32:
-        return OriginalType.UINT_32;
+        return LogicalTypeAnnotation.intType(32, false);
       case UINT_64:
-        return OriginalType.UINT_64;
+        return LogicalTypeAnnotation.intType(64, false);
       case JSON:
-        return OriginalType.JSON;
+        return LogicalTypeAnnotation.jsonType();
       case BSON:
-        return OriginalType.BSON;
+        return LogicalTypeAnnotation.bsonType();
       default:
-        throw new RuntimeException("Unknown converted type " + type);
+        throw new RuntimeException("Can't convert converted type to logical 
type, unknown converted type " + type);
     }
   }
 
-  // Visible for testing
-  ConvertedType getConvertedType(OriginalType type) {
-    switch (type) {
-      case UTF8:
-        return ConvertedType.UTF8;
+  LogicalTypeAnnotation getOriginalType(LogicalType type) {
+    switch (type.getSetField()) {
       case MAP:
-        return ConvertedType.MAP;
-      case MAP_KEY_VALUE:
-        return ConvertedType.MAP_KEY_VALUE;
-      case LIST:
-        return ConvertedType.LIST;
-      case ENUM:
-        return ConvertedType.ENUM;
-      case DECIMAL:
-        return ConvertedType.DECIMAL;
+        return LogicalTypeAnnotation.mapType();
+      case BSON:
+        return LogicalTypeAnnotation.bsonType();
       case DATE:
-        return ConvertedType.DATE;
-      case TIME_MILLIS:
-        return ConvertedType.TIME_MILLIS;
-      case TIME_MICROS:
-        return ConvertedType.TIME_MICROS;
-      case TIMESTAMP_MILLIS:
-        return ConvertedType.TIMESTAMP_MILLIS;
-      case TIMESTAMP_MICROS:
-        return ConvertedType.TIMESTAMP_MICROS;
-      case INTERVAL:
-        return ConvertedType.INTERVAL;
-      case INT_8:
-        return ConvertedType.INT_8;
-      case INT_16:
-        return ConvertedType.INT_16;
-      case INT_32:
-        return ConvertedType.INT_32;
-      case INT_64:
-        return ConvertedType.INT_64;
-      case UINT_8:
-        return ConvertedType.UINT_8;
-      case UINT_16:
-        return ConvertedType.UINT_16;
-      case UINT_32:
-        return ConvertedType.UINT_32;
-      case UINT_64:
-        return ConvertedType.UINT_64;
+        return LogicalTypeAnnotation.dateType();
+      case ENUM:
+        return LogicalTypeAnnotation.enumType();
       case JSON:
-        return ConvertedType.JSON;
-      case BSON:
-        return ConvertedType.BSON;
+        return LogicalTypeAnnotation.jsonType();
+      case LIST:
+        return LogicalTypeAnnotation.listType();
+      case TIME:
+        TimeType time = type.getTIME();
+        return LogicalTypeAnnotation.timeType(time.isAdjustedToUTC, 
convertTimeUnit(time.unit));
+      case STRING:
+        return LogicalTypeAnnotation.stringType();
+      case DECIMAL:
+        DecimalType decimal = type.getDECIMAL();
+        return LogicalTypeAnnotation.decimalType(decimal.scale, 
decimal.precision);
+      case INTEGER:
+        IntType integer = type.getINTEGER();
+        return LogicalTypeAnnotation.intType(integer.bitWidth, 
integer.isSigned);
+      case UNKNOWN:
+        return null;
+      case TIMESTAMP:
+        TimestampType timestamp = type.getTIMESTAMP();
+        return LogicalTypeAnnotation.timestampType(timestamp.isAdjustedToUTC, 
convertTimeUnit(timestamp.unit));
       default:
-        throw new RuntimeException("Unknown original type " + type);
-     }
-   }
+        throw new RuntimeException("Unknown logical type " + type);
+    }
+  }
+
+  private LogicalTypeAnnotation.TimeUnit convertTimeUnit(TimeUnit unit) {
+    switch (unit.getSetField()) {
+      case MICROS:
+        return LogicalTypeAnnotation.TimeUnit.MICROS;
+      case MILLIS:
+        return LogicalTypeAnnotation.TimeUnit.MILLIS;
+      default:
+        throw new RuntimeException("Unknown time unit " + unit);
+    }
+  }
 
   private static void addKeyValue(FileMetaData fileMetaData, String key, 
String value) {
     KeyValue keyValue = new KeyValue(key);
@@ -985,8 +1152,15 @@ private void buildChildren(Types.GroupBuilder builder,
         buildChildren((Types.GroupBuilder) childBuilder, schema, 
schemaElement.num_children, columnOrders, columnCount);
       }
 
+      if (schemaElement.isSetLogicalType()) {
+        childBuilder.as(getOriginalType(schemaElement.logicalType));
+      }
       if (schemaElement.isSetConverted_type()) {
-        childBuilder.as(getOriginalType(schemaElement.converted_type));
+        LogicalTypeAnnotation originalType = 
getOriginalType(schemaElement.converted_type, schemaElement);
+        LogicalTypeAnnotation newLogicalType = 
getOriginalType(schemaElement.logicalType);
+        if (!originalType.equals(newLogicalType)) {
+          childBuilder.as(getOriginalType(schemaElement.converted_type, 
schemaElement));
+        }
       }
       if (schemaElement.isSetField_id()) {
         childBuilder.id(schemaElement.field_id);
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
index cb6af54eb..523b6b36f 100755
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ParquetMetadata.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,8 +27,7 @@
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.map.JsonMappingException;
 import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectWriter;
-import org.codehaus.jackson.map.SerializationConfig.Feature;
+import org.codehaus.jackson.map.SerializationConfig;
 
 /**
  * Meta Data block stored in the footer of the file
@@ -41,6 +40,12 @@
 
   private static final ObjectMapper objectMapper = new ObjectMapper();
 
+  // Enable FAIL_ON_EMPTY_BEANS on objectmapper. Without this feature 
parquet-casdacing tests fail,
+  // because LogicalTypeAnnotation implementations are classes without any 
property.
+  static {
+    objectMapper.configure(SerializationConfig.Feature.FAIL_ON_EMPTY_BEANS, 
false);
+  }
+
   /**
    *
    * @param parquetMetaData
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
index b83da5dbd..0ab93703f 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/format/converter/TestParquetMetadataConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -60,6 +60,8 @@
 import org.apache.parquet.column.statistics.IntStatistics;
 import org.apache.parquet.column.statistics.LongStatistics;
 import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.format.DecimalType;
+import org.apache.parquet.format.LogicalType;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
 import org.apache.parquet.hadoop.metadata.ColumnPath;
@@ -67,6 +69,7 @@
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.io.api.Binary;
 import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.junit.Assert;
 import org.junit.Test;
 import org.apache.parquet.example.Paper;
@@ -130,12 +133,14 @@ public void testSchemaConverterDecimal() {
             .setRepetition_type(FieldRepetitionType.REQUIRED)
             .setType(Type.BYTE_ARRAY)
             .setConverted_type(ConvertedType.DECIMAL)
+            .setLogicalType(LogicalType.DECIMAL(new DecimalType(2, 9)))
             .setPrecision(9).setScale(2),
         new SchemaElement("aFixedDecimal")
             .setRepetition_type(FieldRepetitionType.OPTIONAL)
             .setType(Type.FIXED_LEN_BYTE_ARRAY)
             .setType_length(4)
             .setConverted_type(ConvertedType.DECIMAL)
+            .setLogicalType(LogicalType.DECIMAL(new DecimalType(2, 9)))
             .setPrecision(9).setScale(2)
     );
     Assert.assertEquals(expected, schemaElements);
@@ -163,10 +168,11 @@ public void testEnumEquivalence() {
       assertEquals(type, 
parquetMetadataConverter.getType(parquetMetadataConverter.getPrimitive(type)));
     }
     for (OriginalType original : OriginalType.values()) {
-      assertEquals(original, 
parquetMetadataConverter.getOriginalType(parquetMetadataConverter.getConvertedType(original)));
+      assertEquals(original, parquetMetadataConverter.getOriginalType(
+        
parquetMetadataConverter.convertToConvertedType(LogicalTypeAnnotation.fromOriginalType(original,
 null)), null).toOriginalType());
     }
     for (ConvertedType converted : ConvertedType.values()) {
-      assertEquals(converted, 
parquetMetadataConverter.getConvertedType(parquetMetadataConverter.getOriginalType(converted)));
+      assertEquals(converted, 
parquetMetadataConverter.convertToConvertedType(parquetMetadataConverter.getOriginalType(converted,
 null)));
     }
   }
 
@@ -336,7 +342,7 @@ private ColumnChunkMetaData createColumnChunkMetaData() {
             0, 0, 0, 0, 0);
     return md;
   }
-  
+
   @Test
   public void testEncodingsCache() {
     ParquetMetadataConverter parquetMetadataConverter = new 
ParquetMetadataConverter();
diff --git 
a/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java 
b/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java
index cf995346e..d1d896f79 100644
--- a/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java
+++ b/parquet-pig/src/main/java/org/apache/parquet/pig/PigSchemaConverter.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Support for new logical type representation
> -------------------------------------------
>
>                 Key: PARQUET-1253
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1253
>             Project: Parquet
>          Issue Type: Improvement
>          Components: parquet-mr
>            Reporter: Nandor Kollar
>            Assignee: Nandor Kollar
>            Priority: Major
>
> Latest parquet-format 
> [introduced|https://github.com/apache/parquet-format/commit/863875e0be3237c6aa4ed71733d54c91a51deabe#diff-0f9d1b5347959e15259da7ba8f4b6252]
>  a new representation for logical types. As of now this is not yet supported 
> in parquet-mr, thus there's no way to use parametrized UTC normalized 
> timestamp data types. When reading and writing Parquet files, besides 
> 'converted_type' parquet-mr should use the new 'logicalType' field in 
> SchemaElement to tell the current logical type annotation. To maintain 
> backward compatibility, the semantic of converted_type shouldn't change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to