This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b2c40929dccafe76b654da095a1c385813c6d524 Author: Jark Wu <[email protected]> AuthorDate: Wed Apr 29 11:27:01 2020 +0800 [FLINK-16996][parquet] Refactor parquet connector to use new data structures This closes #11925 --- .../parquet/ParquetFileSystemFormatFactory.java | 16 ++++---- .../formats/parquet/row/ParquetRowDataBuilder.java | 18 ++++----- .../formats/parquet/row/ParquetRowDataWriter.java | 46 +++++++++++----------- .../vector/ParquetColumnarRowSplitReader.java | 14 +++---- .../parquet/vector/ParquetDecimalVector.java | 37 ++++++++++------- .../formats/parquet/vector/ParquetDictionary.java | 6 +-- .../parquet/vector/ParquetSplitReaderUtil.java | 43 ++++++++++---------- .../vector/reader/AbstractColumnReader.java | 4 +- .../parquet/vector/reader/BooleanColumnReader.java | 4 +- .../parquet/vector/reader/ByteColumnReader.java | 4 +- .../parquet/vector/reader/BytesColumnReader.java | 4 +- .../parquet/vector/reader/ColumnReader.java | 2 +- .../parquet/vector/reader/DoubleColumnReader.java | 4 +- .../vector/reader/FixedLenBytesColumnReader.java | 18 ++++----- .../parquet/vector/reader/FloatColumnReader.java | 4 +- .../parquet/vector/reader/IntColumnReader.java | 2 +- .../parquet/vector/reader/LongColumnReader.java | 4 +- .../parquet/vector/reader/RunLengthDecoder.java | 4 +- .../parquet/vector/reader/ShortColumnReader.java | 4 +- .../vector/reader/TimestampColumnReader.java | 14 +++---- .../parquet/row/ParquetRowDataWriterTest.java | 10 ++--- .../vector/ParquetColumnarRowSplitReaderTest.java | 18 ++++----- 22 files changed, 144 insertions(+), 136 deletions(-) diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java index 9b28cfe..35d3778 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java @@ -29,7 +29,7 @@ import org.apache.flink.formats.parquet.row.ParquetRowDataBuilder; import org.apache.flink.formats.parquet.utils.SerializableConfiguration; import org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader; import org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil; -import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.filesystem.FileSystemFormatFactory; import org.apache.flink.table.filesystem.PartitionPathUtils; @@ -49,7 +49,7 @@ import java.util.Map; import java.util.Optional; import static org.apache.flink.configuration.ConfigOptions.key; -import static org.apache.flink.table.dataformat.vector.VectorizedColumnBatch.DEFAULT_SIZE; +import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE; import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT; import static org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType; @@ -114,7 +114,7 @@ public class ParquetFileSystemFormatFactory implements FileSystemFormatFactory { } @Override - public InputFormat<BaseRow, ?> createReader(ReaderContext context) { + public InputFormat<RowData, ?> createReader(ReaderContext context) { DescriptorProperties properties = new DescriptorProperties(); properties.putProperties(context.getFormatProperties()); @@ -130,7 +130,7 @@ public class ParquetFileSystemFormatFactory implements FileSystemFormatFactory { } @Override - public Optional<BulkWriter.Factory<BaseRow>> createBulkWriterFactory(WriterContext context) { + public Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(WriterContext context) { DescriptorProperties properties = new DescriptorProperties(); properties.putProperties(context.getFormatProperties()); @@ -145,7 +145,7 @@ public class ParquetFileSystemFormatFactory implements FileSystemFormatFactory { } @Override - public Optional<Encoder<BaseRow>> createEncoder(WriterContext context) { + public Optional<Encoder<RowData>> createEncoder(WriterContext context) { return Optional.empty(); } @@ -155,10 +155,10 @@ public class ParquetFileSystemFormatFactory implements FileSystemFormatFactory { } /** - * An implementation of {@link ParquetInputFormat} to read {@link BaseRow} records + * An implementation of {@link ParquetInputFormat} to read {@link RowData} records * from Parquet files. */ - public static class ParquetInputFormat extends FileInputFormat<BaseRow> { + public static class ParquetInputFormat extends FileInputFormat<RowData> { private static final long serialVersionUID = 1L; @@ -232,7 +232,7 @@ public class ParquetFileSystemFormatFactory implements FileSystemFormatFactory { } @Override - public BaseRow nextRecord(BaseRow reuse) { + public RowData nextRecord(RowData reuse) { currentReadCount++; return reader.nextRecord(); } diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java index e8399f0..0c75a0a 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java @@ -22,7 +22,7 @@ import org.apache.flink.api.common.serialization.BulkWriter; import org.apache.flink.formats.parquet.ParquetBuilder; import org.apache.flink.formats.parquet.ParquetWriterFactory; import org.apache.flink.formats.parquet.utils.SerializableConfiguration; -import org.apache.flink.table.dataformat.BaseRow; +import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.hadoop.conf.Configuration; @@ -46,9 +46,9 @@ import static org.apache.parquet.hadoop.ParquetOutputFormat.getWriterVersion; import static org.apache.parquet.hadoop.codec.CodecConfig.getParquetCompressionCodec; /** - * {@link BaseRow} of {@link ParquetWriter.Builder}. + * {@link RowData} of {@link ParquetWriter.Builder}. */ -public class ParquetRowDataBuilder extends ParquetWriter.Builder<BaseRow, ParquetRowDataBuilder> { +public class ParquetRowDataBuilder extends ParquetWriter.Builder<RowData, ParquetRowDataBuilder> { private final RowType rowType; private final boolean utcTimestamp; @@ -68,11 +68,11 @@ public class ParquetRowDataBuilder extends ParquetWriter.Builder<BaseRow, Parque } @Override - protected WriteSupport<BaseRow> getWriteSupport(Configuration conf) { + protected WriteSupport<RowData> getWriteSupport(Configuration conf) { return new ParquetWriteSupport(); } - private class ParquetWriteSupport extends WriteSupport<BaseRow> { + private class ParquetWriteSupport extends WriteSupport<RowData> { private MessageType schema = convertToParquetMessageType("flink_schema", rowType); private ParquetRowDataWriter writer; @@ -92,7 +92,7 @@ public class ParquetRowDataBuilder extends ParquetWriter.Builder<BaseRow, Parque } @Override - public void write(BaseRow record) { + public void write(RowData record) { try { this.writer.write(record); } catch (Exception e) { @@ -110,7 +110,7 @@ public class ParquetRowDataBuilder extends ParquetWriter.Builder<BaseRow, Parque * and LocalDateTime. Hive 0.x/1.x/2.x use local timezone. But Hive 3.x * use UTC timezone. */ - public static ParquetWriterFactory<BaseRow> createWriterFactory( + public static ParquetWriterFactory<RowData> createWriterFactory( RowType rowType, Configuration conf, boolean utcTimestamp) { @@ -121,7 +121,7 @@ public class ParquetRowDataBuilder extends ParquetWriter.Builder<BaseRow, Parque /** * Flink Row {@link ParquetBuilder}. */ - public static class FlinkParquetBuilder implements ParquetBuilder<BaseRow> { + public static class FlinkParquetBuilder implements ParquetBuilder<RowData> { private final RowType rowType; private final SerializableConfiguration configuration; @@ -137,7 +137,7 @@ public class ParquetRowDataBuilder extends ParquetWriter.Builder<BaseRow, Parque } @Override - public ParquetWriter<BaseRow> createWriter(OutputFile out) throws IOException { + public ParquetWriter<RowData> createWriter(OutputFile out) throws IOException { Configuration conf = configuration.conf(); return new ParquetRowDataBuilder(out, rowType, utcTimestamp) .withCompressionCodec(getParquetCompressionCodec(conf)) diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java index 13d5783..3ccaaa9 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java @@ -18,9 +18,9 @@ package org.apache.flink.formats.parquet.row; -import org.apache.flink.table.dataformat.BaseRow; -import org.apache.flink.table.dataformat.Decimal; -import org.apache.flink.table.dataformat.SqlTimestamp; +import org.apache.flink.table.data.DecimalDataUtils; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.DecimalType; import org.apache.flink.table.types.logical.LocalZonedTimestampType; import org.apache.flink.table.types.logical.LogicalType; @@ -75,7 +75,7 @@ public class ParquetRowDataWriter { * * @param record Contains the record that is going to be written. */ - public void write(final BaseRow record) { + public void write(final RowData record) { recordConsumer.startMessage(); for (int i = 0; i < filedWriters.length; i++) { if (!record.isNullAt(i)) { @@ -134,13 +134,13 @@ public class ParquetRowDataWriter { private interface FieldWriter { - void write(BaseRow row, int ordinal); + void write(RowData row, int ordinal); } private class BooleanWriter implements FieldWriter { @Override - public void write(BaseRow row, int ordinal) { + public void write(RowData row, int ordinal) { recordConsumer.addBoolean(row.getBoolean(ordinal)); } } @@ -148,7 +148,7 @@ public class ParquetRowDataWriter { private class ByteWriter implements FieldWriter { @Override - public void write(BaseRow row, int ordinal) { + public void write(RowData row, int ordinal) { recordConsumer.addInteger(row.getByte(ordinal)); } } @@ -156,7 +156,7 @@ public class ParquetRowDataWriter { private class ShortWriter implements FieldWriter { @Override - public void write(BaseRow row, int ordinal) { + public void write(RowData row, int ordinal) { recordConsumer.addInteger(row.getShort(ordinal)); } } @@ -164,7 +164,7 @@ public class ParquetRowDataWriter { private class LongWriter implements FieldWriter { @Override - public void write(BaseRow row, int ordinal) { + public void write(RowData row, int ordinal) { recordConsumer.addLong(row.getLong(ordinal)); } } @@ -172,7 +172,7 @@ public class ParquetRowDataWriter { private class FloatWriter implements FieldWriter { @Override - public void write(BaseRow row, int ordinal) { + public void write(RowData row, int ordinal) { recordConsumer.addFloat(row.getFloat(ordinal)); } } @@ -180,7 +180,7 @@ public class ParquetRowDataWriter { private class DoubleWriter implements FieldWriter { @Override - public void write(BaseRow row, int ordinal) { + public void write(RowData row, int ordinal) { recordConsumer.addDouble(row.getDouble(ordinal)); } } @@ -188,16 +188,16 @@ public class ParquetRowDataWriter { private class StringWriter implements FieldWriter { @Override - public void write(BaseRow row, int ordinal) { + public void write(RowData row, int ordinal) { recordConsumer.addBinary( - Binary.fromReusedByteArray(row.getString(ordinal).getBytes())); + Binary.fromReusedByteArray(row.getString(ordinal).toBytes())); } } private class BinaryWriter implements FieldWriter { @Override - public void write(BaseRow row, int ordinal) { + public void write(RowData row, int ordinal) { recordConsumer.addBinary( Binary.fromReusedByteArray(row.getBinary(ordinal))); } @@ -206,7 +206,7 @@ public class ParquetRowDataWriter { private class IntWriter implements FieldWriter { @Override - public void write(BaseRow row, int ordinal) { + public void write(RowData row, int ordinal) { recordConsumer.addInteger(row.getInt(ordinal)); } } @@ -225,20 +225,20 @@ public class ParquetRowDataWriter { } @Override - public void write(BaseRow row, int ordinal) { + public void write(RowData row, int ordinal) { recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, precision))); } } - private Binary timestampToInt96(SqlTimestamp sqlTimestamp) { + private Binary timestampToInt96(TimestampData timestampData) { int julianDay; long nanosOfDay; if (utcTimestamp) { - long mills = sqlTimestamp.getMillisecond(); + long mills = timestampData.getMillisecond(); julianDay = (int) ((mills / MILLIS_IN_DAY) + JULIAN_EPOCH_OFFSET_DAYS); - nanosOfDay = (mills % MILLIS_IN_DAY) * NANOS_PER_MILLISECOND + sqlTimestamp.getNanoOfMillisecond(); + nanosOfDay = (mills % MILLIS_IN_DAY) * NANOS_PER_MILLISECOND + timestampData.getNanoOfMillisecond(); } else { - Timestamp timestamp = sqlTimestamp.toTimestamp(); + Timestamp timestamp = timestampData.toTimestamp(); long mills = timestamp.getTime(); julianDay = (int) ((mills / MILLIS_IN_DAY) + JULIAN_EPOCH_OFFSET_DAYS); nanosOfDay = ((mills % MILLIS_IN_DAY) / 1000) * NANOS_PER_SECOND + timestamp.getNanos(); @@ -273,7 +273,7 @@ public class ParquetRowDataWriter { } @Override - public void write(BaseRow row, int ordinal) { + public void write(RowData row, int ordinal) { long unscaledLong = row.getDecimal(ordinal, precision, scale).toUnscaledLong(); int i = 0; int shift = initShift; @@ -297,7 +297,7 @@ public class ParquetRowDataWriter { } @Override - public void write(BaseRow row, int ordinal) { + public void write(RowData row, int ordinal) { byte[] bytes = row.getDecimal(ordinal, precision, scale).toUnscaledBytes(); byte[] writtenBytes; if (bytes.length == numBytes) { @@ -315,7 +315,7 @@ public class ParquetRowDataWriter { // 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY // optimizer for UnscaledBytesWriter - if (Decimal.is32BitDecimal(precision) || Decimal.is64BitDecimal(precision)) { + if (DecimalDataUtils.is32BitDecimal(precision) || DecimalDataUtils.is64BitDecimal(precision)) { return new LongUnscaledBytesWriter(); } diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java index 8335381..f892d3a 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java @@ -20,10 +20,10 @@ package org.apache.flink.formats.parquet.vector; import org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader; import org.apache.flink.formats.parquet.vector.reader.ColumnReader; -import org.apache.flink.table.dataformat.ColumnarRow; -import org.apache.flink.table.dataformat.vector.ColumnVector; -import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch; -import org.apache.flink.table.dataformat.vector.writable.WritableColumnVector; +import org.apache.flink.table.data.ColumnarRowData; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.VectorizedColumnBatch; +import org.apache.flink.table.data.vector.writable.WritableColumnVector; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.LogicalTypeRoot; @@ -73,7 +73,7 @@ public class ParquetColumnarRowSplitReader implements Closeable { private final VectorizedColumnBatch columnarBatch; - private final ColumnarRow row; + private final ColumnarRowData row; private final LogicalType[] selectedTypes; @@ -140,7 +140,7 @@ public class ParquetColumnarRowSplitReader implements Closeable { this.writableVectors = createWritableVectors(); this.columnarBatch = generator.generate(createReadableVectors()); - this.row = new ColumnarRow(columnarBatch); + this.row = new ColumnarRowData(columnarBatch); } /** @@ -222,7 +222,7 @@ public class ParquetColumnarRowSplitReader implements Closeable { return !ensureBatch(); } - public ColumnarRow nextRecord() { + public ColumnarRowData nextRecord() { // return the next row row.setRowId(this.nextRow++); return row; diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java index 288aed0..88c1aa3 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java @@ -18,12 +18,13 @@ package org.apache.flink.formats.parquet.vector; -import org.apache.flink.table.dataformat.Decimal; -import org.apache.flink.table.dataformat.vector.BytesColumnVector; -import org.apache.flink.table.dataformat.vector.ColumnVector; -import org.apache.flink.table.dataformat.vector.DecimalColumnVector; -import org.apache.flink.table.dataformat.vector.IntColumnVector; -import org.apache.flink.table.dataformat.vector.LongColumnVector; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.DecimalDataUtils; +import org.apache.flink.table.data.vector.BytesColumnVector; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.DecimalColumnVector; +import org.apache.flink.table.data.vector.IntColumnVector; +import org.apache.flink.table.data.vector.LongColumnVector; /** * Parquet write decimal as int32 and int64 and binary, this class wrap the real vector to @@ -38,16 +39,22 @@ public class ParquetDecimalVector implements DecimalColumnVector { } @Override - public Decimal getDecimal(int i, int precision, int scale) { - if (Decimal.is32BitDecimal(precision)) { - return Decimal.fromUnscaledLong( - precision, scale, ((IntColumnVector) vector).getInt(i)); - } else if (Decimal.is64BitDecimal(precision)) { - return Decimal.fromUnscaledLong( - precision, scale, ((LongColumnVector) vector).getLong(i)); + public DecimalData getDecimal(int i, int precision, int scale) { + if (DecimalDataUtils.is32BitDecimal(precision)) { + return DecimalData.fromUnscaledLong( + ((IntColumnVector) vector).getInt(i), + precision, + scale); + } else if (DecimalDataUtils.is64BitDecimal(precision)) { + return DecimalData.fromUnscaledLong( + ((LongColumnVector) vector).getLong(i), + precision, + scale); } else { - return Decimal.fromUnscaledBytes( - precision, scale, ((BytesColumnVector) vector).getBytes(i).getBytes()); + return DecimalData.fromUnscaledBytes( + ((BytesColumnVector) vector).getBytes(i).getBytes(), + precision, + scale); } } diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java index c93025e..b574837 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java @@ -18,8 +18,8 @@ package org.apache.flink.formats.parquet.vector; -import org.apache.flink.table.dataformat.SqlTimestamp; -import org.apache.flink.table.dataformat.vector.Dictionary; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.vector.Dictionary; import static org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.decodeInt96ToTimestamp; @@ -60,7 +60,7 @@ public final class ParquetDictionary implements Dictionary { } @Override - public SqlTimestamp decodeToTimestamp(int id) { + public TimestampData decodeToTimestamp(int id) { return decodeInt96ToTimestamp(true, dictionary, id); } } diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java index dab7ff9..8be1e2f 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java @@ -30,20 +30,21 @@ import org.apache.flink.formats.parquet.vector.reader.IntColumnReader; import org.apache.flink.formats.parquet.vector.reader.LongColumnReader; import org.apache.flink.formats.parquet.vector.reader.ShortColumnReader; import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader; -import org.apache.flink.table.dataformat.Decimal; -import org.apache.flink.table.dataformat.SqlTimestamp; -import org.apache.flink.table.dataformat.vector.ColumnVector; -import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch; -import org.apache.flink.table.dataformat.vector.heap.HeapBooleanVector; -import org.apache.flink.table.dataformat.vector.heap.HeapByteVector; -import org.apache.flink.table.dataformat.vector.heap.HeapBytesVector; -import org.apache.flink.table.dataformat.vector.heap.HeapDoubleVector; -import org.apache.flink.table.dataformat.vector.heap.HeapFloatVector; -import org.apache.flink.table.dataformat.vector.heap.HeapIntVector; -import org.apache.flink.table.dataformat.vector.heap.HeapLongVector; -import org.apache.flink.table.dataformat.vector.heap.HeapShortVector; -import org.apache.flink.table.dataformat.vector.heap.HeapTimestampVector; -import org.apache.flink.table.dataformat.vector.writable.WritableColumnVector; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.DecimalDataUtils; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.vector.ColumnVector; +import org.apache.flink.table.data.vector.VectorizedColumnBatch; +import org.apache.flink.table.data.vector.heap.HeapBooleanVector; +import org.apache.flink.table.data.vector.heap.HeapByteVector; +import org.apache.flink.table.data.vector.heap.HeapBytesVector; +import org.apache.flink.table.data.vector.heap.HeapDoubleVector; +import org.apache.flink.table.data.vector.heap.HeapFloatVector; +import org.apache.flink.table.data.vector.heap.HeapIntVector; +import org.apache.flink.table.data.vector.heap.HeapLongVector; +import org.apache.flink.table.data.vector.heap.HeapShortVector; +import org.apache.flink.table.data.vector.heap.HeapTimestampVector; +import org.apache.flink.table.data.vector.writable.WritableColumnVector; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.DecimalType; @@ -192,15 +193,15 @@ public class ParquetSplitReaderUtil { DecimalType decimalType = (DecimalType) type; int precision = decimalType.getPrecision(); int scale = decimalType.getScale(); - Decimal decimal = value == null ? null : Preconditions.checkNotNull( - Decimal.fromBigDecimal((BigDecimal) value, precision, scale)); + DecimalData decimal = value == null ? null : Preconditions.checkNotNull( + DecimalData.fromBigDecimal((BigDecimal) value, precision, scale)); ColumnVector internalVector; - if (Decimal.is32BitDecimal(precision)) { + if (DecimalDataUtils.is32BitDecimal(precision)) { internalVector = createVectorFromConstant( new IntType(), decimal == null ? null : (int) decimal.toUnscaledLong(), batchSize); - } else if (Decimal.is64BitDecimal(precision)) { + } else if (DecimalDataUtils.is64BitDecimal(precision)) { internalVector = createVectorFromConstant( new BigIntType(), decimal == null ? null : decimal.toUnscaledLong(), @@ -241,7 +242,7 @@ public class ParquetSplitReaderUtil { if (value == null) { tv.fillWithNulls(); } else { - tv.fill(SqlTimestamp.fromLocalDateTime((LocalDateTime) value)); + tv.fill(TimestampData.fromLocalDateTime((LocalDateTime) value)); } return tv; default: @@ -355,14 +356,14 @@ public class ParquetSplitReaderUtil { return new HeapTimestampVector(batchSize); case DECIMAL: DecimalType decimalType = (DecimalType) fieldType; - if (Decimal.is32BitDecimal(decimalType.getPrecision())) { + if (DecimalDataUtils.is32BitDecimal(decimalType.getPrecision())) { checkArgument( (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY || typeName == PrimitiveType.PrimitiveTypeName.INT32) && primitiveType.getOriginalType() == OriginalType.DECIMAL, "Unexpected type: %s", typeName); return new HeapIntVector(batchSize); - } else if (Decimal.is64BitDecimal(decimalType.getPrecision())) { + } else if (DecimalDataUtils.is64BitDecimal(decimalType.getPrecision())) { checkArgument( (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY || typeName == PrimitiveType.PrimitiveTypeName.INT64) && diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java index 786f546..6bbaf06 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java @@ -18,8 +18,8 @@ package org.apache.flink.formats.parquet.vector.reader; import org.apache.flink.formats.parquet.vector.ParquetDictionary; -import org.apache.flink.table.dataformat.vector.writable.WritableColumnVector; -import org.apache.flink.table.dataformat.vector.writable.WritableIntVector; +import org.apache.flink.table.data.vector.writable.WritableColumnVector; +import org.apache.flink.table.data.vector.writable.WritableIntVector; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferInputStream; diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BooleanColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BooleanColumnReader.java index 504a504..6c4dfea 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BooleanColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BooleanColumnReader.java @@ -17,8 +17,8 @@ package org.apache.flink.formats.parquet.vector.reader; -import org.apache.flink.table.dataformat.vector.writable.WritableBooleanVector; -import org.apache.flink.table.dataformat.vector.writable.WritableIntVector; +import org.apache.flink.table.data.vector.writable.WritableBooleanVector; +import org.apache.flink.table.data.vector.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ByteColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ByteColumnReader.java index ada3405..9034514 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ByteColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ByteColumnReader.java @@ -17,8 +17,8 @@ package org.apache.flink.formats.parquet.vector.reader; -import org.apache.flink.table.dataformat.vector.writable.WritableByteVector; -import org.apache.flink.table.dataformat.vector.writable.WritableIntVector; +import org.apache.flink.table.data.vector.writable.WritableByteVector; +import org.apache.flink.table.data.vector.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BytesColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BytesColumnReader.java index 3f6a135..261ae92 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BytesColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BytesColumnReader.java @@ -17,8 +17,8 @@ package org.apache.flink.formats.parquet.vector.reader; -import org.apache.flink.table.dataformat.vector.writable.WritableBytesVector; -import org.apache.flink.table.dataformat.vector.writable.WritableIntVector; +import org.apache.flink.table.data.vector.writable.WritableBytesVector; +import org.apache.flink.table.data.vector.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ColumnReader.java index f05dc6c..3adea5d 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ColumnReader.java @@ -17,7 +17,7 @@ package org.apache.flink.formats.parquet.vector.reader; -import org.apache.flink.table.dataformat.vector.writable.WritableColumnVector; +import org.apache.flink.table.data.vector.writable.WritableColumnVector; import java.io.IOException; diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/DoubleColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/DoubleColumnReader.java index 4999c72..00e8d95 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/DoubleColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/DoubleColumnReader.java @@ -17,8 +17,8 @@ package org.apache.flink.formats.parquet.vector.reader; -import org.apache.flink.table.dataformat.vector.writable.WritableDoubleVector; -import org.apache.flink.table.dataformat.vector.writable.WritableIntVector; +import org.apache.flink.table.data.vector.writable.WritableDoubleVector; +import org.apache.flink.table.data.vector.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java index 835562e..5874073 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java @@ -17,11 +17,11 @@ package org.apache.flink.formats.parquet.vector.reader; -import org.apache.flink.table.dataformat.Decimal; -import org.apache.flink.table.dataformat.vector.writable.WritableBytesVector; -import org.apache.flink.table.dataformat.vector.writable.WritableColumnVector; -import org.apache.flink.table.dataformat.vector.writable.WritableIntVector; -import org.apache.flink.table.dataformat.vector.writable.WritableLongVector; +import org.apache.flink.table.data.DecimalDataUtils; +import org.apache.flink.table.data.vector.writable.WritableBytesVector; +import org.apache.flink.table.data.vector.writable.WritableColumnVector; +import org.apache.flink.table.data.vector.writable.WritableIntVector; +import org.apache.flink.table.data.vector.writable.WritableLongVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; @@ -50,7 +50,7 @@ public class FixedLenBytesColumnReader<VECTOR extends WritableColumnVector> exte @Override protected void readBatch(int rowId, int num, VECTOR column) { int bytesLen = descriptor.getPrimitiveType().getTypeLength(); - if (Decimal.is32BitDecimal(precision)) { + if (DecimalDataUtils.is32BitDecimal(precision)) { WritableIntVector intVector = (WritableIntVector) column; for (int i = 0; i < num; i++) { if (runLenDecoder.readInteger() == maxDefLevel) { @@ -59,7 +59,7 @@ public class FixedLenBytesColumnReader<VECTOR extends WritableColumnVector> exte intVector.setNullAt(rowId + i); } } - } else if (Decimal.is64BitDecimal(precision)) { + } else if (DecimalDataUtils.is64BitDecimal(precision)) { WritableLongVector longVector = (WritableLongVector) column; for (int i = 0; i < num; i++) { if (runLenDecoder.readInteger() == maxDefLevel) { @@ -87,7 +87,7 @@ public class FixedLenBytesColumnReader<VECTOR extends WritableColumnVector> exte int num, VECTOR column, WritableIntVector dictionaryIds) { - if (Decimal.is32BitDecimal(precision)) { + if (DecimalDataUtils.is32BitDecimal(precision)) { WritableIntVector intVector = (WritableIntVector) column; for (int i = rowId; i < rowId + num; ++i) { if (!intVector.isNullAt(i)) { @@ -95,7 +95,7 @@ public class FixedLenBytesColumnReader<VECTOR extends WritableColumnVector> exte intVector.setInt(i, (int) heapBinaryToLong(v)); } } - } else if (Decimal.is64BitDecimal(precision)) { + } else if (DecimalDataUtils.is64BitDecimal(precision)) { WritableLongVector longVector = (WritableLongVector) column; for (int i = rowId; i < rowId + num; ++i) { if (!longVector.isNullAt(i)) { diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FloatColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FloatColumnReader.java index 71b794a..0a2b14d 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FloatColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FloatColumnReader.java @@ -17,8 +17,8 @@ package org.apache.flink.formats.parquet.vector.reader; -import org.apache.flink.table.dataformat.vector.writable.WritableFloatVector; -import org.apache.flink.table.dataformat.vector.writable.WritableIntVector; +import org.apache.flink.table.data.vector.writable.WritableFloatVector; +import org.apache.flink.table.data.vector.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/IntColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/IntColumnReader.java index 9b07367..c0b2645 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/IntColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/IntColumnReader.java @@ -17,7 +17,7 @@ package org.apache.flink.formats.parquet.vector.reader; -import org.apache.flink.table.dataformat.vector.writable.WritableIntVector; +import org.apache.flink.table.data.vector.writable.WritableIntVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/LongColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/LongColumnReader.java index b68a732..bffe238 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/LongColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/LongColumnReader.java @@ -17,8 +17,8 @@ package org.apache.flink.formats.parquet.vector.reader; -import org.apache.flink.table.dataformat.vector.writable.WritableIntVector; -import org.apache.flink.table.dataformat.vector.writable.WritableLongVector; +import org.apache.flink.table.data.vector.writable.WritableIntVector; +import org.apache.flink.table.data.vector.writable.WritableLongVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RunLengthDecoder.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RunLengthDecoder.java index 3337f6d..6eb6298 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RunLengthDecoder.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RunLengthDecoder.java @@ -17,8 +17,8 @@ package org.apache.flink.formats.parquet.vector.reader; -import org.apache.flink.table.dataformat.vector.writable.WritableColumnVector; -import org.apache.flink.table.dataformat.vector.writable.WritableIntVector; +import org.apache.flink.table.data.vector.writable.WritableColumnVector; +import org.apache.flink.table.data.vector.writable.WritableIntVector; import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferInputStream; diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ShortColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ShortColumnReader.java index a98051d..0e3f83c 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ShortColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ShortColumnReader.java @@ -17,8 +17,8 @@ package org.apache.flink.formats.parquet.vector.reader; -import org.apache.flink.table.dataformat.vector.writable.WritableIntVector; -import org.apache.flink.table.dataformat.vector.writable.WritableShortVector; +import org.apache.flink.table.data.vector.writable.WritableIntVector; +import org.apache.flink.table.data.vector.writable.WritableShortVector; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReader; diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java index 256ae72..21006a2 100644 --- a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java @@ -17,9 +17,9 @@ package org.apache.flink.formats.parquet.vector.reader; -import org.apache.flink.table.dataformat.SqlTimestamp; -import org.apache.flink.table.dataformat.vector.writable.WritableIntVector; -import org.apache.flink.table.dataformat.vector.writable.WritableTimestampVector; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.vector.writable.WritableIntVector; +import org.apache.flink.table.data.vector.writable.WritableTimestampVector; import org.apache.parquet.Preconditions; import org.apache.parquet.column.ColumnDescriptor; @@ -88,7 +88,7 @@ public class TimestampColumnReader extends AbstractColumnReader<WritableTimestam } } - public static SqlTimestamp decodeInt96ToTimestamp( + public static TimestampData decodeInt96ToTimestamp( boolean utcTimestamp, org.apache.parquet.column.Dictionary dictionary, int id) { @@ -100,17 +100,17 @@ public class TimestampColumnReader extends AbstractColumnReader<WritableTimestam return int96ToTimestamp(utcTimestamp, buffer.getLong(), buffer.getInt()); } - private static SqlTimestamp int96ToTimestamp( + private static TimestampData int96ToTimestamp( boolean utcTimestamp, long nanosOfDay, int julianDay) { long millisecond = julianDayToMillis(julianDay) + (nanosOfDay / NANOS_PER_MILLISECOND); if (utcTimestamp) { int nanoOfMillisecond = (int) (nanosOfDay % NANOS_PER_MILLISECOND); - return SqlTimestamp.fromEpochMillis(millisecond, nanoOfMillisecond); + return TimestampData.fromEpochMillis(millisecond, nanoOfMillisecond); } else { Timestamp timestamp = new Timestamp(millisecond); timestamp.setNanos((int) (nanosOfDay % NANOS_PER_SECOND)); - return SqlTimestamp.fromTimestamp(timestamp); + return TimestampData.fromTimestamp(timestamp); } } diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java index 508a36c..eecf1b8 100644 --- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java @@ -24,8 +24,8 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.formats.parquet.ParquetWriterFactory; import org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader; import org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil; -import org.apache.flink.table.dataformat.BaseRow; -import org.apache.flink.table.dataformat.DataFormatConverters; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.util.DataFormatConverters; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.BigIntType; import org.apache.flink.table.types.logical.BooleanType; @@ -83,7 +83,7 @@ public class ParquetRowDataWriterTest { new DecimalType(20, 0)); @SuppressWarnings("unchecked") - private static final DataFormatConverters.DataFormatConverter<BaseRow, Row> CONVERTER = + private static final DataFormatConverters.DataFormatConverter<RowData, Row> CONVERTER = DataFormatConverters.getConverterForDataType( TypeConversions.fromLogicalToDataType(ROW_TYPE)); @@ -126,9 +126,9 @@ public class ParquetRowDataWriterTest { BigDecimal.valueOf(v))); } - ParquetWriterFactory<BaseRow> factory = ParquetRowDataBuilder.createWriterFactory( + ParquetWriterFactory<RowData> factory = ParquetRowDataBuilder.createWriterFactory( ROW_TYPE, conf, utcTimestamp); - BulkWriter<BaseRow> writer = factory.create(path.getFileSystem().create( + BulkWriter<RowData> writer = factory.create(path.getFileSystem().create( path, FileSystem.WriteMode.OVERWRITE)); for (int i = 0; i < number; i++) { writer.addElement(CONVERTER.toInternal(rows.get(i))); diff --git a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java index 758c267..d2098d5 100644 --- a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java +++ b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java @@ -19,9 +19,9 @@ package org.apache.flink.formats.parquet.vector; import org.apache.flink.core.fs.Path; -import org.apache.flink.table.dataformat.ColumnarRow; -import org.apache.flink.table.dataformat.Decimal; -import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch; +import org.apache.flink.table.data.ColumnarRowData; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.vector.VectorizedColumnBatch; import org.apache.flink.table.runtime.functions.SqlDateTimeUtils; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.BigIntType; @@ -212,7 +212,7 @@ public class ParquetColumnarRowSplitReaderTest { int i = start; while (!reader.reachedEnd()) { - ColumnarRow row = reader.nextRecord(); + ColumnarRowData row = reader.nextRecord(); Integer v = values.get(i); if (v == null) { assertTrue(row.isNullAt(0)); @@ -405,7 +405,7 @@ public class ParquetColumnarRowSplitReaderTest { Long.MAX_VALUE); int i = 0; while (!reader.reachedEnd()) { - ColumnarRow row = reader.nextRecord(); + ColumnarRowData row = reader.nextRecord(); assertEquals(i, row.getDouble(0), 0); assertEquals((byte) i, row.getByte(1)); assertEquals(i, row.getInt(2)); @@ -500,7 +500,7 @@ public class ParquetColumnarRowSplitReaderTest { Long.MAX_VALUE); int i = 0; while (!reader.reachedEnd()) { - ColumnarRow row = reader.nextRecord(); + ColumnarRowData row = reader.nextRecord(); // common values assertEquals(i, row.getDouble(0), 0); @@ -527,13 +527,13 @@ public class ParquetColumnarRowSplitReaderTest { LocalDateTime.of(1999, 1, 1, 1, 1), row.getTimestamp(11, 9).toLocalDateTime()); assertEquals( - Decimal.fromBigDecimal(new BigDecimal(24), 5, 0), + DecimalData.fromBigDecimal(new BigDecimal(24), 5, 0), row.getDecimal(12, 5, 0)); assertEquals( - Decimal.fromBigDecimal(new BigDecimal(25), 15, 0), + DecimalData.fromBigDecimal(new BigDecimal(25), 15, 0), row.getDecimal(13, 15, 0)); assertEquals( - Decimal.fromBigDecimal(new BigDecimal(26), 20, 0), + DecimalData.fromBigDecimal(new BigDecimal(26), 20, 0), row.getDecimal(14, 20, 0)); assertEquals("f27", row.getString(15).toString()); }
