This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7ca0a09 [SPARK-34661][SQL] Clean up `OriginalType` and `DecimalMetadata ` usage in Parquet related code 7ca0a09 is described below commit 7ca0a0910f6ea42086c64ef8eba2f21988015dd2 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Sun May 16 09:03:26 2021 -0500 [SPARK-34661][SQL] Clean up `OriginalType` and `DecimalMetadata ` usage in Parquet related code ### What changes were proposed in this pull request? `OriginalType` and `DecimalMetadata` has been marked as `Deprecated` in new Parquet code. `Apache Parquet` suggest us replace `OriginalType` with `LogicalTypeAnnotation` and replace `DecimalMetadata` with `DecimalLogicalTypeAnnotation`, so the main change of this pr is clean up these deprecated usages in Parquet related code. ### Why are the changes needed? Cleanup deprecated api usage. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the Jenkins or GitHub Action Closes #31776 from LuciferYang/cleanup-parquet-dep-api. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../parquet/VectorizedColumnReader.java | 69 +++++++---- .../parquet/VectorizedParquetRecordReader.java | 2 +- .../datasources/parquet/ParquetFilters.scala | 105 +++++++++-------- .../datasources/parquet/ParquetReadSupport.scala | 18 +-- .../datasources/parquet/ParquetRowConverter.scala | 88 ++++++++------ .../parquet/ParquetSchemaConverter.scala | 131 ++++++++++++--------- 6 files changed, 244 insertions(+), 169 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 52620b0..8932916 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -32,8 +32,12 @@ import org.apache.parquet.column.Encoding; import org.apache.parquet.column.page.*; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.io.api.Binary; -import org.apache.parquet.schema.DecimalMetadata; -import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.IntLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DateLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimestampLogicalTypeAnnotation; +import org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit; import org.apache.parquet.schema.PrimitiveType; import org.apache.spark.sql.catalyst.util.DateTimeUtils; @@ -101,7 +105,7 @@ public class VectorizedColumnReader { private final PageReader pageReader; private final ColumnDescriptor descriptor; - private final OriginalType originalType; + private final LogicalTypeAnnotation logicalTypeAnnotation; // The timezone conversion to apply to int96 timestamps. Null if no conversion. private final ZoneId convertTz; private static final ZoneId UTC = ZoneOffset.UTC; @@ -110,10 +114,14 @@ public class VectorizedColumnReader { private boolean isDecimalTypeMatched(DataType dt) { DecimalType d = (DecimalType) dt; - DecimalMetadata dm = descriptor.getPrimitiveType().getDecimalMetadata(); - // It's OK if the required decimal precision is larger than or equal to the physical decimal - // precision in the Parquet metadata, as long as the decimal scale is the same. - return dm != null && dm.getPrecision() <= d.precision() && dm.getScale() == d.scale(); + LogicalTypeAnnotation typeAnnotation = descriptor.getPrimitiveType().getLogicalTypeAnnotation(); + if (typeAnnotation instanceof DecimalLogicalTypeAnnotation) { + DecimalLogicalTypeAnnotation decimalType = (DecimalLogicalTypeAnnotation) typeAnnotation; + // It's OK if the required decimal precision is larger than or equal to the physical decimal + // precision in the Parquet metadata, as long as the decimal scale is the same. + return decimalType.getPrecision() <= d.precision() && decimalType.getScale() == d.scale(); + } + return false; } private boolean canReadAsIntDecimal(DataType dt) { @@ -133,7 +141,7 @@ public class VectorizedColumnReader { public VectorizedColumnReader( ColumnDescriptor descriptor, - OriginalType originalType, + LogicalTypeAnnotation logicalTypeAnnotation, PageReader pageReader, ZoneId convertTz, String datetimeRebaseMode, @@ -141,7 +149,7 @@ public class VectorizedColumnReader { this.descriptor = descriptor; this.pageReader = pageReader; this.convertTz = convertTz; - this.originalType = originalType; + this.logicalTypeAnnotation = logicalTypeAnnotation; this.maxDefLevel = descriptor.getMaxDefinitionLevel(); DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); @@ -172,13 +180,14 @@ public class VectorizedColumnReader { boolean isSupported = false; switch (typeName) { case INT32: - isSupported = originalType != OriginalType.DATE || "CORRECTED".equals(datetimeRebaseMode); + isSupported = !(logicalTypeAnnotation instanceof DateLogicalTypeAnnotation) || + "CORRECTED".equals(datetimeRebaseMode); break; case INT64: - if (originalType == OriginalType.TIMESTAMP_MICROS) { + if (isTimestampTypeMatched(TimeUnit.MICROS)) { isSupported = "CORRECTED".equals(datetimeRebaseMode); } else { - isSupported = originalType != OriginalType.TIMESTAMP_MILLIS; + isSupported = !isTimestampTypeMatched(TimeUnit.MILLIS); } break; case FLOAT: @@ -263,17 +272,18 @@ public class VectorizedColumnReader { // We need to make sure that we initialize the right type for the dictionary otherwise // WritableColumnVector will throw an exception when trying to decode to an Int when the // dictionary is in fact initialized as Long - boolean castLongToInt = primitiveType.getOriginalType() == OriginalType.DECIMAL && - primitiveType.getDecimalMetadata().getPrecision() <= Decimal.MAX_INT_DIGITS() && - primitiveType.getPrimitiveTypeName() == INT64; + LogicalTypeAnnotation typeAnnotation = primitiveType.getLogicalTypeAnnotation(); + boolean castLongToInt = typeAnnotation instanceof DecimalLogicalTypeAnnotation && + ((DecimalLogicalTypeAnnotation) typeAnnotation).getPrecision() <= + Decimal.MAX_INT_DIGITS() && primitiveType.getPrimitiveTypeName() == INT64; // We require a long value, but we need to use dictionary to decode the original // signed int first - boolean isUnsignedInt32 = primitiveType.getOriginalType() == OriginalType.UINT_32; + boolean isUnsignedInt32 = isUnsignedIntTypeMatched(32); // We require a decimal value, but we need to use dictionary to decode the original // signed long first - boolean isUnsignedInt64 = primitiveType.getOriginalType() == OriginalType.UINT_64; + boolean isUnsignedInt64 = isUnsignedIntTypeMatched(64); boolean needTransform = castLongToInt || isUnsignedInt32 || isUnsignedInt64; column.setDictionary(new ParquetDictionary(dictionary, needTransform)); @@ -398,14 +408,14 @@ public class VectorizedColumnReader { case INT64: if (column.dataType() == DataTypes.LongType || canReadAsLongDecimal(column.dataType()) || - (originalType == OriginalType.TIMESTAMP_MICROS && + (isTimestampTypeMatched(TimeUnit.MICROS) && "CORRECTED".equals(datetimeRebaseMode))) { for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i))); } } - } else if (originalType == OriginalType.UINT_64) { + } else if (isUnsignedIntTypeMatched(64)) { // In `ParquetToSparkSchemaConverter`, we map parquet UINT64 to our Decimal(20, 0). // For unsigned int64, it stores as dictionary encoded signed int64 in Parquet // whenever dictionary is available. @@ -418,7 +428,7 @@ public class VectorizedColumnReader { column.putByteArray(i, unsigned); } } - } else if (originalType == OriginalType.TIMESTAMP_MILLIS) { + } else if (isTimestampTypeMatched(TimeUnit.MILLIS)) { if ("CORRECTED".equals(datetimeRebaseMode)) { for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { @@ -436,7 +446,7 @@ public class VectorizedColumnReader { } } } - } else if (originalType == OriginalType.TIMESTAMP_MICROS) { + } else if (isTimestampTypeMatched(TimeUnit.MICROS)) { final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode); for (int i = rowId; i < rowId + num; ++i) { if (!column.isNullAt(i)) { @@ -611,13 +621,13 @@ public class VectorizedColumnReader { defColumn.readLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, DecimalType.is32BitDecimalType(column.dataType())); - } else if (originalType == OriginalType.UINT_64) { + } else if (isUnsignedIntTypeMatched(64)) { // In `ParquetToSparkSchemaConverter`, we map parquet UINT64 to our Decimal(20, 0). // For unsigned int64, it stores as plain signed int64 in Parquet when dictionary fallbacks. // We read them as decimal values. defColumn.readUnsignedLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn); - } else if (originalType == OriginalType.TIMESTAMP_MICROS) { + } else if (isTimestampTypeMatched(TimeUnit.MICROS)) { if ("CORRECTED".equals(datetimeRebaseMode)) { defColumn.readLongs( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, false); @@ -626,7 +636,7 @@ public class VectorizedColumnReader { defColumn.readLongsWithRebase( num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, failIfRebase); } - } else if (originalType == OriginalType.TIMESTAMP_MILLIS) { + } else if (isTimestampTypeMatched(TimeUnit.MILLIS)) { if ("CORRECTED".equals(datetimeRebaseMode)) { for (int i = 0; i < num; i++) { if (defColumn.readInteger() == maxDefLevel) { @@ -871,4 +881,15 @@ public class VectorizedColumnReader { throw new IOException("could not read page " + page + " in col " + descriptor, e); } } + + private boolean isTimestampTypeMatched(TimeUnit unit) { + return logicalTypeAnnotation instanceof TimestampLogicalTypeAnnotation && + ((TimestampLogicalTypeAnnotation) logicalTypeAnnotation).getUnit() == unit; + } + + private boolean isUnsignedIntTypeMatched(int bitWidth) { + return logicalTypeAnnotation instanceof IntLogicalTypeAnnotation && + !((IntLogicalTypeAnnotation) logicalTypeAnnotation).isSigned() && + ((IntLogicalTypeAnnotation) logicalTypeAnnotation).getBitWidth() == bitWidth; + } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 1b15953..3245527 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -332,7 +332,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa if (missingColumns[i]) continue; columnReaders[i] = new VectorizedColumnReader( columns.get(i), - types.get(i).getOriginalType(), + types.get(i).getLogicalTypeAnnotation(), pages.getPageReader(columns.get(i)), convertTz, datetimeRebaseMode, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 73910c3..6eb4573 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -28,8 +28,8 @@ import scala.collection.JavaConverters.asScalaBufferConverter import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.SparkFilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.{DecimalMetadata, GroupType, MessageType, OriginalType, PrimitiveComparator, PrimitiveType, Type} -import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, MessageType, PrimitiveComparator, PrimitiveType, Type} +import org.apache.parquet.schema.LogicalTypeAnnotation.{DecimalLogicalTypeAnnotation, TimeUnit} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ @@ -62,8 +62,8 @@ class ParquetFilters( fields.flatMap { case p: PrimitiveType => Some(ParquetPrimitiveField(fieldNames = parentFieldNames :+ p.getName, - fieldType = ParquetSchemaType(p.getOriginalType, - p.getPrimitiveTypeName, p.getTypeLength, p.getDecimalMetadata))) + fieldType = ParquetSchemaType(p.getLogicalTypeAnnotation, + p.getPrimitiveTypeName, p.getTypeLength))) // Note that when g is a `Struct`, `g.getOriginalType` is `null`. // When g is a `Map`, `g.getOriginalType` is `MAP`. // When g is a `List`, `g.getOriginalType` is `LIST`. @@ -105,23 +105,28 @@ class ParquetFilters( fieldType: ParquetSchemaType) private case class ParquetSchemaType( - originalType: OriginalType, + logicalTypeAnnotation: LogicalTypeAnnotation, primitiveTypeName: PrimitiveTypeName, - length: Int, - decimalMetadata: DecimalMetadata) - - private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0, null) - private val ParquetByteType = ParquetSchemaType(INT_8, INT32, 0, null) - private val ParquetShortType = ParquetSchemaType(INT_16, INT32, 0, null) - private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0, null) - private val ParquetLongType = ParquetSchemaType(null, INT64, 0, null) - private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0, null) - private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0, null) - private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, 0, null) - private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0, null) - private val ParquetDateType = ParquetSchemaType(DATE, INT32, 0, null) - private val ParquetTimestampMicrosType = ParquetSchemaType(TIMESTAMP_MICROS, INT64, 0, null) - private val ParquetTimestampMillisType = ParquetSchemaType(TIMESTAMP_MILLIS, INT64, 0, null) + length: Int) + + private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, 0) + private val ParquetByteType = + ParquetSchemaType(LogicalTypeAnnotation.intType(8, true), INT32, 0) + private val ParquetShortType = + ParquetSchemaType(LogicalTypeAnnotation.intType(16, true), INT32, 0) + private val ParquetIntegerType = ParquetSchemaType(null, INT32, 0) + private val ParquetLongType = ParquetSchemaType(null, INT64, 0) + private val ParquetFloatType = ParquetSchemaType(null, FLOAT, 0) + private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, 0) + private val ParquetStringType = + ParquetSchemaType(LogicalTypeAnnotation.stringType(), BINARY, 0) + private val ParquetBinaryType = ParquetSchemaType(null, BINARY, 0) + private val ParquetDateType = + ParquetSchemaType(LogicalTypeAnnotation.dateType(), INT32, 0) + private val ParquetTimestampMicrosType = + ParquetSchemaType(LogicalTypeAnnotation.timestampType(true, TimeUnit.MICROS), INT64, 0) + private val ParquetTimestampMillisType = + ParquetSchemaType(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS), INT64, 0) private def dateToDays(date: Any): Int = date match { case d: Date => DateTimeUtils.fromJavaDate(d) @@ -195,15 +200,16 @@ class ParquetFilters( longColumn(n), Option(v).map(timestampToMillis).orNull) - case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.eq( intColumn(n), Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull) - case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT64, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.eq( longColumn(n), Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length)).orNull) @@ -245,15 +251,16 @@ class ParquetFilters( longColumn(n), Option(v).map(timestampToMillis).orNull) - case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.notEq( intColumn(n), Option(v).map(d => decimalToInt32(d.asInstanceOf[JBigDecimal])).orNull) - case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT64, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.notEq( longColumn(n), Option(v).map(d => decimalToInt64(d.asInstanceOf[JBigDecimal])).orNull) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.notEq( binaryColumn(n), Option(v).map(d => decimalToByteArray(d.asInstanceOf[JBigDecimal], length)).orNull) @@ -285,13 +292,14 @@ class ParquetFilters( case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMillis(v)) - case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.lt(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT64, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.lt(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -322,13 +330,14 @@ class ParquetFilters( case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMillis(v)) - case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.ltEq(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT64, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.ltEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -359,13 +368,14 @@ class ParquetFilters( case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMillis(v)) - case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.gt(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT64, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.gt(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -396,13 +406,14 @@ class ParquetFilters( case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMillis(v)) - case ParquetSchemaType(DECIMAL, INT32, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT32, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.gtEq(intColumn(n), decimalToInt32(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, INT64, _, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, INT64, _) if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), decimalToInt64(v.asInstanceOf[JBigDecimal])) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, length, _) if pushDownDecimal => + case ParquetSchemaType(_: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, length) + if pushDownDecimal => (n: Array[String], v: Any) => FilterApi.gtEq(binaryColumn(n), decimalToByteArray(v.asInstanceOf[JBigDecimal], length)) } @@ -469,21 +480,23 @@ class ParquetFilters( value.isInstanceOf[Date] || value.isInstanceOf[LocalDate] case ParquetTimestampMicrosType | ParquetTimestampMillisType => value.isInstanceOf[Timestamp] || value.isInstanceOf[Instant] - case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) => - isDecimalMatched(value, decimalMeta) - case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) => - isDecimalMatched(value, decimalMeta) - case ParquetSchemaType(DECIMAL, FIXED_LEN_BYTE_ARRAY, _, decimalMeta) => - isDecimalMatched(value, decimalMeta) + case ParquetSchemaType(decimalType: DecimalLogicalTypeAnnotation, INT32, _) => + isDecimalMatched(value, decimalType) + case ParquetSchemaType(decimalType: DecimalLogicalTypeAnnotation, INT64, _) => + isDecimalMatched(value, decimalType) + case + ParquetSchemaType(decimalType: DecimalLogicalTypeAnnotation, FIXED_LEN_BYTE_ARRAY, _) => + isDecimalMatched(value, decimalType) case _ => false }) } // Decimal type must make sure that filter value's scale matched the file. // If doesn't matched, which would cause data corruption. - private def isDecimalMatched(value: Any, decimalMeta: DecimalMetadata): Boolean = value match { + private def isDecimalMatched(value: Any, + decimalLogicalType: DecimalLogicalTypeAnnotation): Boolean = value match { case decimal: JBigDecimal => - decimal.scale == decimalMeta.getScale + decimal.scale == decimalLogicalType.getScale case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index ce06620..597a1bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -27,6 +27,7 @@ import org.apache.parquet.hadoop.api.{InitContext, ReadSupport} import org.apache.parquet.hadoop.api.ReadSupport.ReadContext import org.apache.parquet.io.api.RecordMaterializer import org.apache.parquet.schema._ +import org.apache.parquet.schema.LogicalTypeAnnotation.ListLogicalTypeAnnotation import org.apache.parquet.schema.Type.Repetition import org.apache.spark.internal.Logging @@ -214,13 +215,14 @@ object ParquetReadSupport { // Unannotated repeated group should be interpreted as required list of required element, so // list element type is just the group itself. Clip it. - if (parquetList.getOriginalType == null && parquetList.isRepetition(Repetition.REPEATED)) { + if (parquetList.getLogicalTypeAnnotation == null && + parquetList.isRepetition(Repetition.REPEATED)) { clipParquetType(parquetList, elementType, caseSensitive) } else { assert( - parquetList.getOriginalType == OriginalType.LIST, + parquetList.getLogicalTypeAnnotation.isInstanceOf[ListLogicalTypeAnnotation], "Invalid Parquet schema. " + - "Original type of annotated Parquet lists must be LIST: " + + "Logical type annotation of annotated Parquet lists must be ListLogicalTypeAnnotation: " + parquetList.toString) assert( @@ -246,7 +248,7 @@ object ParquetReadSupport { ) { Types .buildGroup(parquetList.getRepetition) - .as(OriginalType.LIST) + .as(LogicalTypeAnnotation.listType()) .addField(clipParquetType(repeatedGroup, elementType, caseSensitive)) .named(parquetList.getName) } else { @@ -254,7 +256,7 @@ object ParquetReadSupport { // repetition. Types .buildGroup(parquetList.getRepetition) - .as(OriginalType.LIST) + .as(LogicalTypeAnnotation.listType()) .addField( Types .repeatedGroup() @@ -285,14 +287,14 @@ object ParquetReadSupport { val clippedRepeatedGroup = Types .repeatedGroup() - .as(repeatedGroup.getOriginalType) + .as(repeatedGroup.getLogicalTypeAnnotation) .addField(clipParquetType(parquetKeyType, keyType, caseSensitive)) .addField(clipParquetType(parquetValueType, valueType, caseSensitive)) .named(repeatedGroup.getName) Types .buildGroup(parquetMap.getRepetition) - .as(parquetMap.getOriginalType) + .as(parquetMap.getLogicalTypeAnnotation) .addField(clippedRepeatedGroup) .named(parquetMap.getName) } @@ -310,7 +312,7 @@ object ParquetReadSupport { val clippedParquetFields = clipParquetGroupFields(parquetRecord, structType, caseSensitive) Types .buildGroup(parquetRecord.getRepetition) - .as(parquetRecord.getOriginalType) + .as(parquetRecord.getLogicalTypeAnnotation) .addFields(clippedParquetFields: _*) .named(parquetRecord.getName) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 0a1cca7..0556257 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -26,8 +26,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} -import org.apache.parquet.schema.{GroupType, OriginalType, Type} -import org.apache.parquet.schema.OriginalType.LIST +import org.apache.parquet.schema.{GroupType, Type} +import org.apache.parquet.schema.LogicalTypeAnnotation._ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, FIXED_LEN_BYTE_ARRAY, INT32, INT64, INT96} import org.apache.spark.internal.Logging @@ -110,12 +110,12 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd * - a root [[ParquetRowConverter]] for [[org.apache.parquet.schema.MessageType]] `root`, * which contains: * - a [[ParquetPrimitiveConverter]] for required - * [[org.apache.parquet.schema.OriginalType.INT_32]] field `f1`, and + * [[org.apache.parquet.schema.LogicalTypeAnnotation.intType(32, true)]] field `f1`, and * - a nested [[ParquetRowConverter]] for optional [[GroupType]] `f2`, which contains: * - a [[ParquetPrimitiveConverter]] for required * [[org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE]] field `f21`, and - * - a [[ParquetStringConverter]] for optional [[org.apache.parquet.schema.OriginalType.UTF8]] - * string field `f22` + * - a [[ParquetStringConverter]] for optional + * [[org.apache.parquet.schema.LogicalTypeAnnotation.stringType()]] string field `f22` * * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have * any "parent" container. @@ -251,8 +251,15 @@ private[parquet] class ParquetRowConverter( catalystType: DataType, updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = { + def isUnsignedIntTypeMatched(bitWidth: Int): Boolean = { + parquetType.getLogicalTypeAnnotation match { + case i: IntLogicalTypeAnnotation if !i.isSigned => i.getBitWidth == bitWidth + case _ => false + } + } + catalystType match { - case LongType if parquetType.getOriginalType == OriginalType.UINT_32 => + case LongType if isUnsignedIntTypeMatched(32) => new ParquetPrimitiveConverter(updater) { override def addInt(value: Int): Unit = updater.setLong(Integer.toUnsignedLong(value)) @@ -273,20 +280,20 @@ private[parquet] class ParquetRowConverter( } // For INT32 backed decimals - case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 => - val metadata = parquetType.asPrimitiveType().getDecimalMetadata - if (metadata == null) { - // If the column is a plain INT32, we should pick the precision that can host the largest - // INT32 value. - new ParquetIntDictionaryAwareDecimalConverter( - DecimalType.IntDecimal.precision, 0, updater) - } else { - new ParquetIntDictionaryAwareDecimalConverter( - metadata.getPrecision, metadata.getScale, updater) + case _: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 => + parquetType.asPrimitiveType().getLogicalTypeAnnotation match { + case decimalType: DecimalLogicalTypeAnnotation => + new ParquetIntDictionaryAwareDecimalConverter( + decimalType.getPrecision, decimalType.getScale, updater) + case _ => + // If the column is a plain INT32, we should pick the precision that can host the + // largest INT32 value. + new ParquetIntDictionaryAwareDecimalConverter( + DecimalType.IntDecimal.precision, 0, updater) } // For unsigned int64 - case _: DecimalType if parquetType.getOriginalType == OriginalType.UINT_64 => + case _: DecimalType if isUnsignedIntTypeMatched(64) => new ParquetPrimitiveConverter(updater) { override def addLong(value: Long): Unit = { updater.set(Decimal(java.lang.Long.toUnsignedString(value))) @@ -295,29 +302,29 @@ private[parquet] class ParquetRowConverter( // For INT64 backed decimals case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 => - val metadata = parquetType.asPrimitiveType().getDecimalMetadata - if (metadata == null) { - // If the column is a plain INT64, we should pick the precision that can host the largest - // INT64 value. - new ParquetLongDictionaryAwareDecimalConverter( - DecimalType.LongDecimal.precision, 0, updater) - } else { - new ParquetLongDictionaryAwareDecimalConverter( - metadata.getPrecision, metadata.getScale, updater) + parquetType.asPrimitiveType().getLogicalTypeAnnotation match { + case decimalType: DecimalLogicalTypeAnnotation => + new ParquetLongDictionaryAwareDecimalConverter( + decimalType.getPrecision, decimalType.getScale, updater) + case _ => + // If the column is a plain INT64, we should pick the precision that can host the + // largest INT64 value. + new ParquetLongDictionaryAwareDecimalConverter( + DecimalType.LongDecimal.precision, 0, updater) } // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY || parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY => - val metadata = parquetType.asPrimitiveType().getDecimalMetadata - if (metadata == null) { - throw new RuntimeException(s"Unable to create Parquet converter for ${t.typeName} " + - s"whose Parquet type is $parquetType without decimal metadata. Please read this " + - "column/field as Spark BINARY type." ) - } else { - new ParquetBinaryDictionaryAwareDecimalConverter( - metadata.getPrecision, metadata.getScale, updater) + parquetType.asPrimitiveType().getLogicalTypeAnnotation match { + case decimalType: DecimalLogicalTypeAnnotation => + new ParquetBinaryDictionaryAwareDecimalConverter( + decimalType.getPrecision, decimalType.getScale, updater) + case _ => + throw new RuntimeException(s"Unable to create Parquet converter for ${t.typeName} " + + s"whose Parquet type is $parquetType without decimal metadata. Please read this " + + "column/field as Spark BINARY type." ) } case t: DecimalType => @@ -329,14 +336,20 @@ private[parquet] class ParquetRowConverter( case StringType => new ParquetStringConverter(updater) - case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MICROS => + case TimestampType + if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] && + parquetType.getLogicalTypeAnnotation + .asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == TimeUnit.MICROS => new ParquetPrimitiveConverter(updater) { override def addLong(value: Long): Unit = { updater.setLong(timestampRebaseFunc(value)) } } - case TimestampType if parquetType.getOriginalType == OriginalType.TIMESTAMP_MILLIS => + case TimestampType + if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] && + parquetType.getLogicalTypeAnnotation + .asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == TimeUnit.MILLIS => new ParquetPrimitiveConverter(updater) { override def addLong(value: Long): Unit = { val micros = DateTimeUtils.millisToMicros(value) @@ -367,7 +380,8 @@ private[parquet] class ParquetRowConverter( // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor // annotated by `LIST` or `MAP` should be interpreted as a required list of required // elements where the element type is the type of the field. - case t: ArrayType if parquetType.getOriginalType != LIST => + case t: ArrayType + if !parquetType.getLogicalTypeAnnotation.isInstanceOf[ListLogicalTypeAnnotation] => if (parquetType.isPrimitive) { new RepeatedPrimitiveConverter(parquetType, t.elementType, updater) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index e751c97..1b26c69 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.parquet.schema._ -import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.LogicalTypeAnnotation._ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.parquet.schema.Type.Repetition._ @@ -93,10 +93,10 @@ class ParquetToSparkSchemaConverter( private def convertPrimitiveField(field: PrimitiveType): DataType = { val typeName = field.getPrimitiveTypeName - val originalType = field.getOriginalType + val typeAnnotation = field.getLogicalTypeAnnotation def typeString = - if (originalType == null) s"$typeName" else s"$typeName ($originalType)" + if (typeAnnotation == null) s"$typeName" else s"$typeName ($typeAnnotation)" def typeNotImplemented() = throw QueryCompilationErrors.parquetTypeUnsupportedYetError(typeString) @@ -108,8 +108,10 @@ class ParquetToSparkSchemaConverter( // specified in field.getDecimalMetadata. This is useful when interpreting decimal types stored // as binaries with variable lengths. def makeDecimalType(maxPrecision: Int = -1): DecimalType = { - val precision = field.getDecimalMetadata.getPrecision - val scale = field.getDecimalMetadata.getScale + val decimalLogicalTypeAnnotation = field.getLogicalTypeAnnotation + .asInstanceOf[DecimalLogicalTypeAnnotation] + val precision = decimalLogicalTypeAnnotation.getPrecision + val scale = decimalLogicalTypeAnnotation.getScale ParquetSchemaConverter.checkConversionRequirement( maxPrecision == -1 || 1 <= precision && precision <= maxPrecision, @@ -126,26 +128,49 @@ class ParquetToSparkSchemaConverter( case DOUBLE => DoubleType case INT32 => - originalType match { - case INT_8 => ByteType - case INT_16 | UINT_8 => ShortType - case INT_32 | UINT_16 | null => IntegerType - case DATE => DateType - case DECIMAL => makeDecimalType(Decimal.MAX_INT_DIGITS) - case UINT_32 => LongType - case TIME_MILLIS => typeNotImplemented() + typeAnnotation match { + case intTypeAnnotation: IntLogicalTypeAnnotation if intTypeAnnotation.isSigned => + intTypeAnnotation.getBitWidth match { + case 8 => ByteType + case 16 => ShortType + case 32 => IntegerType + case _ => illegalType() + } + case null => IntegerType + case _: DateLogicalTypeAnnotation => DateType + case _: DecimalLogicalTypeAnnotation => makeDecimalType(Decimal.MAX_INT_DIGITS) + case intTypeAnnotation: IntLogicalTypeAnnotation if !intTypeAnnotation.isSigned => + intTypeAnnotation.getBitWidth match { + case 8 => ShortType + case 16 => IntegerType + case 32 => LongType + case _ => illegalType() + } + case t: TimestampLogicalTypeAnnotation if t.getUnit == TimeUnit.MILLIS => + typeNotImplemented() case _ => illegalType() } case INT64 => - originalType match { - case INT_64 | null => LongType - case DECIMAL => makeDecimalType(Decimal.MAX_LONG_DIGITS) - // The precision to hold the largest unsigned long is: - // `java.lang.Long.toUnsignedString(-1).length` = 20 - case UINT_64 => DecimalType(20, 0) - case TIMESTAMP_MICROS => TimestampType - case TIMESTAMP_MILLIS => TimestampType + typeAnnotation match { + case intTypeAnnotation: IntLogicalTypeAnnotation if intTypeAnnotation.isSigned => + intTypeAnnotation.getBitWidth match { + case 64 => LongType + case _ => illegalType() + } + case null => LongType + case _: DecimalLogicalTypeAnnotation => makeDecimalType(Decimal.MAX_LONG_DIGITS) + case intTypeAnnotation: IntLogicalTypeAnnotation if !intTypeAnnotation.isSigned => + intTypeAnnotation.getBitWidth match { + // The precision to hold the largest unsigned long is: + // `java.lang.Long.toUnsignedString(-1).length` = 20 + case 64 => DecimalType(20, 0) + case _ => illegalType() + } + case timestamp: TimestampLogicalTypeAnnotation if timestamp.getUnit == TimeUnit.MICROS => + TimestampType + case timestamp: TimestampLogicalTypeAnnotation if timestamp.getUnit == TimeUnit.MILLIS => + TimestampType case _ => illegalType() } @@ -157,19 +182,21 @@ class ParquetToSparkSchemaConverter( TimestampType case BINARY => - originalType match { - case UTF8 | ENUM | JSON => StringType + typeAnnotation match { + case _: StringLogicalTypeAnnotation | _: EnumLogicalTypeAnnotation | + _: JsonLogicalTypeAnnotation => StringType case null if assumeBinaryIsString => StringType case null => BinaryType - case BSON => BinaryType - case DECIMAL => makeDecimalType() + case _: BsonLogicalTypeAnnotation => BinaryType + case _: DecimalLogicalTypeAnnotation => makeDecimalType() case _ => illegalType() } case FIXED_LEN_BYTE_ARRAY => - originalType match { - case DECIMAL => makeDecimalType(Decimal.maxPrecisionForBytes(field.getTypeLength)) - case INTERVAL => typeNotImplemented() + typeAnnotation match { + case _: DecimalLogicalTypeAnnotation => + makeDecimalType(Decimal.maxPrecisionForBytes(field.getTypeLength)) + case _: IntervalLogicalTypeAnnotation => typeNotImplemented() case _ => illegalType() } @@ -178,7 +205,7 @@ class ParquetToSparkSchemaConverter( } private def convertGroupField(field: GroupType): DataType = { - Option(field.getOriginalType).fold(convert(field): DataType) { + Option(field.getLogicalTypeAnnotation).fold(convert(field): DataType) { // A Parquet list is represented as a 3-level structure: // // <list-repetition> group <name> (LIST) { @@ -192,7 +219,7 @@ class ParquetToSparkSchemaConverter( // we need to check whether the 2nd level or the 3rd level refers to list element type. // // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists - case LIST => + case _: ListLogicalTypeAnnotation => ParquetSchemaConverter.checkConversionRequirement( field.getFieldCount == 1, s"Invalid list type $field") @@ -212,7 +239,7 @@ class ParquetToSparkSchemaConverter( // `MAP_KEY_VALUE` is for backwards-compatibility // See: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules-1 // scalastyle:on - case MAP | MAP_KEY_VALUE => + case _: MapLogicalTypeAnnotation | _: MapKeyValueTypeAnnotation => ParquetSchemaConverter.checkConversionRequirement( field.getFieldCount == 1 && !field.getType(0).isPrimitive, s"Invalid map type: $field") @@ -340,10 +367,12 @@ class SparkToParquetSchemaConverter( Types.primitive(BOOLEAN, repetition).named(field.name) case ByteType => - Types.primitive(INT32, repetition).as(INT_8).named(field.name) + Types.primitive(INT32, repetition) + .as(LogicalTypeAnnotation.intType(8, true)).named(field.name) case ShortType => - Types.primitive(INT32, repetition).as(INT_16).named(field.name) + Types.primitive(INT32, repetition) + .as(LogicalTypeAnnotation.intType(16, true)).named(field.name) case IntegerType => Types.primitive(INT32, repetition).named(field.name) @@ -358,10 +387,12 @@ class SparkToParquetSchemaConverter( Types.primitive(DOUBLE, repetition).named(field.name) case StringType => - Types.primitive(BINARY, repetition).as(UTF8).named(field.name) + Types.primitive(BINARY, repetition) + .as(LogicalTypeAnnotation.stringType()).named(field.name) case DateType => - Types.primitive(INT32, repetition).as(DATE).named(field.name) + Types.primitive(INT32, repetition) + .as(LogicalTypeAnnotation.dateType()).named(field.name) // NOTE: Spark SQL can write timestamp values to Parquet using INT96, TIMESTAMP_MICROS or // TIMESTAMP_MILLIS. TIMESTAMP_MICROS is recommended but INT96 is the default to keep the @@ -382,9 +413,11 @@ class SparkToParquetSchemaConverter( case SQLConf.ParquetOutputTimestampType.INT96 => Types.primitive(INT96, repetition).named(field.name) case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS => - Types.primitive(INT64, repetition).as(TIMESTAMP_MICROS).named(field.name) + Types.primitive(INT64, repetition) + .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MICROS)).named(field.name) case SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS => - Types.primitive(INT64, repetition).as(TIMESTAMP_MILLIS).named(field.name) + Types.primitive(INT64, repetition) + .as(LogicalTypeAnnotation.timestampType(true, TimeUnit.MILLIS)).named(field.name) } case BinaryType => @@ -401,9 +434,7 @@ class SparkToParquetSchemaConverter( case DecimalType.Fixed(precision, scale) if writeLegacyParquetFormat => Types .primitive(FIXED_LEN_BYTE_ARRAY, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) + .as(LogicalTypeAnnotation.decimalType(scale, precision)) .length(Decimal.minBytesForPrecision(precision)) .named(field.name) @@ -416,9 +447,7 @@ class SparkToParquetSchemaConverter( if precision <= Decimal.MAX_INT_DIGITS && !writeLegacyParquetFormat => Types .primitive(INT32, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) + .as(LogicalTypeAnnotation.decimalType(scale, precision)) .named(field.name) // Uses INT64 for 1 <= precision <= 18 @@ -426,18 +455,14 @@ class SparkToParquetSchemaConverter( if precision <= Decimal.MAX_LONG_DIGITS && !writeLegacyParquetFormat => Types .primitive(INT64, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) + .as(LogicalTypeAnnotation.decimalType(scale, precision)) .named(field.name) // Uses FIXED_LEN_BYTE_ARRAY for all other precisions case DecimalType.Fixed(precision, scale) if !writeLegacyParquetFormat => Types .primitive(FIXED_LEN_BYTE_ARRAY, repetition) - .as(DECIMAL) - .precision(precision) - .scale(scale) + .as(LogicalTypeAnnotation.decimalType(scale, precision)) .length(Decimal.minBytesForPrecision(precision)) .named(field.name) @@ -462,7 +487,7 @@ class SparkToParquetSchemaConverter( // `array` as its element name as below. Therefore, we build manually // the correct group type here via the builder. (See SPARK-16777) Types - .buildGroup(repetition).as(LIST) + .buildGroup(repetition).as(LogicalTypeAnnotation.listType()) .addField(Types .buildGroup(REPEATED) // "array" is the name chosen by parquet-hive (1.7.0 and prior version) @@ -480,7 +505,7 @@ class SparkToParquetSchemaConverter( // Here too, we should not use `listOfElements`. (See SPARK-16777) Types - .buildGroup(repetition).as(LIST) + .buildGroup(repetition).as(LogicalTypeAnnotation.listType()) // "array" is the name chosen by parquet-avro (1.7.0 and prior version) .addField(convertField(StructField("array", elementType, nullable), REPEATED)) .named(field.name) @@ -511,7 +536,7 @@ class SparkToParquetSchemaConverter( // } // } Types - .buildGroup(repetition).as(LIST) + .buildGroup(repetition).as(LogicalTypeAnnotation.listType()) .addField( Types.repeatedGroup() .addField(convertField(StructField("element", elementType, containsNull))) @@ -526,7 +551,7 @@ class SparkToParquetSchemaConverter( // } // } Types - .buildGroup(repetition).as(MAP) + .buildGroup(repetition).as(LogicalTypeAnnotation.mapType()) .addField( Types .repeatedGroup() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org