This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b2c40929dccafe76b654da095a1c385813c6d524
Author: Jark Wu <[email protected]>
AuthorDate: Wed Apr 29 11:27:01 2020 +0800

    [FLINK-16996][parquet] Refactor parquet connector to use new data structures
    
    This closes #11925
---
 .../parquet/ParquetFileSystemFormatFactory.java    | 16 ++++----
 .../formats/parquet/row/ParquetRowDataBuilder.java | 18 ++++-----
 .../formats/parquet/row/ParquetRowDataWriter.java  | 46 +++++++++++-----------
 .../vector/ParquetColumnarRowSplitReader.java      | 14 +++----
 .../parquet/vector/ParquetDecimalVector.java       | 37 ++++++++++-------
 .../formats/parquet/vector/ParquetDictionary.java  |  6 +--
 .../parquet/vector/ParquetSplitReaderUtil.java     | 43 ++++++++++----------
 .../vector/reader/AbstractColumnReader.java        |  4 +-
 .../parquet/vector/reader/BooleanColumnReader.java |  4 +-
 .../parquet/vector/reader/ByteColumnReader.java    |  4 +-
 .../parquet/vector/reader/BytesColumnReader.java   |  4 +-
 .../parquet/vector/reader/ColumnReader.java        |  2 +-
 .../parquet/vector/reader/DoubleColumnReader.java  |  4 +-
 .../vector/reader/FixedLenBytesColumnReader.java   | 18 ++++-----
 .../parquet/vector/reader/FloatColumnReader.java   |  4 +-
 .../parquet/vector/reader/IntColumnReader.java     |  2 +-
 .../parquet/vector/reader/LongColumnReader.java    |  4 +-
 .../parquet/vector/reader/RunLengthDecoder.java    |  4 +-
 .../parquet/vector/reader/ShortColumnReader.java   |  4 +-
 .../vector/reader/TimestampColumnReader.java       | 14 +++----
 .../parquet/row/ParquetRowDataWriterTest.java      | 10 ++---
 .../vector/ParquetColumnarRowSplitReaderTest.java  | 18 ++++-----
 22 files changed, 144 insertions(+), 136 deletions(-)

diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
index 9b28cfe..35d3778 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
@@ -29,7 +29,7 @@ import 
org.apache.flink.formats.parquet.row.ParquetRowDataBuilder;
 import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
 import org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader;
 import org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil;
-import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.filesystem.FileSystemFormatFactory;
 import org.apache.flink.table.filesystem.PartitionPathUtils;
@@ -49,7 +49,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
-import static 
org.apache.flink.table.dataformat.vector.VectorizedColumnBatch.DEFAULT_SIZE;
+import static 
org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
 import static 
org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT;
 import static 
org.apache.flink.table.filesystem.RowPartitionComputer.restorePartValueFromType;
 
@@ -114,7 +114,7 @@ public class ParquetFileSystemFormatFactory implements 
FileSystemFormatFactory {
        }
 
        @Override
-       public InputFormat<BaseRow, ?> createReader(ReaderContext context) {
+       public InputFormat<RowData, ?> createReader(ReaderContext context) {
                DescriptorProperties properties = new DescriptorProperties();
                properties.putProperties(context.getFormatProperties());
 
@@ -130,7 +130,7 @@ public class ParquetFileSystemFormatFactory implements 
FileSystemFormatFactory {
        }
 
        @Override
-       public Optional<BulkWriter.Factory<BaseRow>> 
createBulkWriterFactory(WriterContext context) {
+       public Optional<BulkWriter.Factory<RowData>> 
createBulkWriterFactory(WriterContext context) {
                DescriptorProperties properties = new DescriptorProperties();
                properties.putProperties(context.getFormatProperties());
 
@@ -145,7 +145,7 @@ public class ParquetFileSystemFormatFactory implements 
FileSystemFormatFactory {
        }
 
        @Override
-       public Optional<Encoder<BaseRow>> createEncoder(WriterContext context) {
+       public Optional<Encoder<RowData>> createEncoder(WriterContext context) {
                return Optional.empty();
        }
 
@@ -155,10 +155,10 @@ public class ParquetFileSystemFormatFactory implements 
FileSystemFormatFactory {
        }
 
        /**
-        * An implementation of {@link ParquetInputFormat} to read {@link 
BaseRow} records
+        * An implementation of {@link ParquetInputFormat} to read {@link 
RowData} records
         * from Parquet files.
         */
-       public static class ParquetInputFormat extends FileInputFormat<BaseRow> 
{
+       public static class ParquetInputFormat extends FileInputFormat<RowData> 
{
 
                private static final long serialVersionUID = 1L;
 
@@ -232,7 +232,7 @@ public class ParquetFileSystemFormatFactory implements 
FileSystemFormatFactory {
                }
 
                @Override
-               public BaseRow nextRecord(BaseRow reuse) {
+               public RowData nextRecord(RowData reuse) {
                        currentReadCount++;
                        return reader.nextRecord();
                }
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java
index e8399f0..0c75a0a 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataBuilder.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.formats.parquet.ParquetBuilder;
 import org.apache.flink.formats.parquet.ParquetWriterFactory;
 import org.apache.flink.formats.parquet.utils.SerializableConfiguration;
-import org.apache.flink.table.dataformat.BaseRow;
+import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 
 import org.apache.hadoop.conf.Configuration;
@@ -46,9 +46,9 @@ import static 
org.apache.parquet.hadoop.ParquetOutputFormat.getWriterVersion;
 import static 
org.apache.parquet.hadoop.codec.CodecConfig.getParquetCompressionCodec;
 
 /**
- * {@link BaseRow} of {@link ParquetWriter.Builder}.
+ * {@link RowData} of {@link ParquetWriter.Builder}.
  */
-public class ParquetRowDataBuilder extends ParquetWriter.Builder<BaseRow, 
ParquetRowDataBuilder> {
+public class ParquetRowDataBuilder extends ParquetWriter.Builder<RowData, 
ParquetRowDataBuilder> {
 
        private final RowType rowType;
        private final boolean utcTimestamp;
@@ -68,11 +68,11 @@ public class ParquetRowDataBuilder extends 
ParquetWriter.Builder<BaseRow, Parque
        }
 
        @Override
-       protected WriteSupport<BaseRow> getWriteSupport(Configuration conf) {
+       protected WriteSupport<RowData> getWriteSupport(Configuration conf) {
                return new ParquetWriteSupport();
        }
 
-       private class ParquetWriteSupport extends WriteSupport<BaseRow> {
+       private class ParquetWriteSupport extends WriteSupport<RowData> {
 
                private MessageType schema = 
convertToParquetMessageType("flink_schema", rowType);
                private ParquetRowDataWriter writer;
@@ -92,7 +92,7 @@ public class ParquetRowDataBuilder extends 
ParquetWriter.Builder<BaseRow, Parque
                }
 
                @Override
-               public void write(BaseRow record) {
+               public void write(RowData record) {
                        try {
                                this.writer.write(record);
                        } catch (Exception e) {
@@ -110,7 +110,7 @@ public class ParquetRowDataBuilder extends 
ParquetWriter.Builder<BaseRow, Parque
         *                     and LocalDateTime. Hive 0.x/1.x/2.x use local 
timezone. But Hive 3.x
         *                     use UTC timezone.
         */
-       public static ParquetWriterFactory<BaseRow> createWriterFactory(
+       public static ParquetWriterFactory<RowData> createWriterFactory(
                        RowType rowType,
                        Configuration conf,
                        boolean utcTimestamp) {
@@ -121,7 +121,7 @@ public class ParquetRowDataBuilder extends 
ParquetWriter.Builder<BaseRow, Parque
        /**
         * Flink Row {@link ParquetBuilder}.
         */
-       public static class FlinkParquetBuilder implements 
ParquetBuilder<BaseRow> {
+       public static class FlinkParquetBuilder implements 
ParquetBuilder<RowData> {
 
                private final RowType rowType;
                private final SerializableConfiguration configuration;
@@ -137,7 +137,7 @@ public class ParquetRowDataBuilder extends 
ParquetWriter.Builder<BaseRow, Parque
                }
 
                @Override
-               public ParquetWriter<BaseRow> createWriter(OutputFile out) 
throws IOException {
+               public ParquetWriter<RowData> createWriter(OutputFile out) 
throws IOException {
                        Configuration conf = configuration.conf();
                        return new ParquetRowDataBuilder(out, rowType, 
utcTimestamp)
                                        
.withCompressionCodec(getParquetCompressionCodec(conf))
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
index 13d5783..3ccaaa9 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriter.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.formats.parquet.row;
 
-import org.apache.flink.table.dataformat.BaseRow;
-import org.apache.flink.table.dataformat.Decimal;
-import org.apache.flink.table.dataformat.SqlTimestamp;
+import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -75,7 +75,7 @@ public class ParquetRowDataWriter {
         *
         * @param record Contains the record that is going to be written.
         */
-       public void write(final BaseRow record) {
+       public void write(final RowData record) {
                recordConsumer.startMessage();
                for (int i = 0; i < filedWriters.length; i++) {
                        if (!record.isNullAt(i)) {
@@ -134,13 +134,13 @@ public class ParquetRowDataWriter {
 
        private interface FieldWriter {
 
-               void write(BaseRow row, int ordinal);
+               void write(RowData row, int ordinal);
        }
 
        private class BooleanWriter implements FieldWriter {
 
                @Override
-               public void write(BaseRow row, int ordinal) {
+               public void write(RowData row, int ordinal) {
                        recordConsumer.addBoolean(row.getBoolean(ordinal));
                }
        }
@@ -148,7 +148,7 @@ public class ParquetRowDataWriter {
        private class ByteWriter implements FieldWriter {
 
                @Override
-               public void write(BaseRow row, int ordinal) {
+               public void write(RowData row, int ordinal) {
                        recordConsumer.addInteger(row.getByte(ordinal));
                }
        }
@@ -156,7 +156,7 @@ public class ParquetRowDataWriter {
        private class ShortWriter implements FieldWriter {
 
                @Override
-               public void write(BaseRow row, int ordinal) {
+               public void write(RowData row, int ordinal) {
                        recordConsumer.addInteger(row.getShort(ordinal));
                }
        }
@@ -164,7 +164,7 @@ public class ParquetRowDataWriter {
        private class LongWriter implements FieldWriter {
 
                @Override
-               public void write(BaseRow row, int ordinal) {
+               public void write(RowData row, int ordinal) {
                        recordConsumer.addLong(row.getLong(ordinal));
                }
        }
@@ -172,7 +172,7 @@ public class ParquetRowDataWriter {
        private class FloatWriter implements FieldWriter {
 
                @Override
-               public void write(BaseRow row, int ordinal) {
+               public void write(RowData row, int ordinal) {
                        recordConsumer.addFloat(row.getFloat(ordinal));
                }
        }
@@ -180,7 +180,7 @@ public class ParquetRowDataWriter {
        private class DoubleWriter implements FieldWriter {
 
                @Override
-               public void write(BaseRow row, int ordinal) {
+               public void write(RowData row, int ordinal) {
                        recordConsumer.addDouble(row.getDouble(ordinal));
                }
        }
@@ -188,16 +188,16 @@ public class ParquetRowDataWriter {
        private class StringWriter implements FieldWriter {
 
                @Override
-               public void write(BaseRow row, int ordinal) {
+               public void write(RowData row, int ordinal) {
                        recordConsumer.addBinary(
-                               
Binary.fromReusedByteArray(row.getString(ordinal).getBytes()));
+                               
Binary.fromReusedByteArray(row.getString(ordinal).toBytes()));
                }
        }
 
        private class BinaryWriter implements FieldWriter {
 
                @Override
-               public void write(BaseRow row, int ordinal) {
+               public void write(RowData row, int ordinal) {
                        recordConsumer.addBinary(
                                        
Binary.fromReusedByteArray(row.getBinary(ordinal)));
                }
@@ -206,7 +206,7 @@ public class ParquetRowDataWriter {
        private class IntWriter implements FieldWriter {
 
                @Override
-               public void write(BaseRow row, int ordinal) {
+               public void write(RowData row, int ordinal) {
                        recordConsumer.addInteger(row.getInt(ordinal));
                }
        }
@@ -225,20 +225,20 @@ public class ParquetRowDataWriter {
                }
 
                @Override
-               public void write(BaseRow row, int ordinal) {
+               public void write(RowData row, int ordinal) {
                        
recordConsumer.addBinary(timestampToInt96(row.getTimestamp(ordinal, 
precision)));
                }
        }
 
-       private Binary timestampToInt96(SqlTimestamp sqlTimestamp) {
+       private Binary timestampToInt96(TimestampData timestampData) {
                int julianDay;
                long nanosOfDay;
                if (utcTimestamp) {
-                       long mills = sqlTimestamp.getMillisecond();
+                       long mills = timestampData.getMillisecond();
                        julianDay = (int) ((mills / MILLIS_IN_DAY) + 
JULIAN_EPOCH_OFFSET_DAYS);
-                       nanosOfDay = (mills % MILLIS_IN_DAY) * 
NANOS_PER_MILLISECOND + sqlTimestamp.getNanoOfMillisecond();
+                       nanosOfDay = (mills % MILLIS_IN_DAY) * 
NANOS_PER_MILLISECOND + timestampData.getNanoOfMillisecond();
                } else {
-                       Timestamp timestamp = sqlTimestamp.toTimestamp();
+                       Timestamp timestamp = timestampData.toTimestamp();
                        long mills = timestamp.getTime();
                        julianDay = (int) ((mills / MILLIS_IN_DAY) + 
JULIAN_EPOCH_OFFSET_DAYS);
                        nanosOfDay = ((mills % MILLIS_IN_DAY) / 1000) * 
NANOS_PER_SECOND + timestamp.getNanos();
@@ -273,7 +273,7 @@ public class ParquetRowDataWriter {
                        }
 
                        @Override
-                       public void write(BaseRow row, int ordinal) {
+                       public void write(RowData row, int ordinal) {
                                long unscaledLong = row.getDecimal(ordinal, 
precision, scale).toUnscaledLong();
                                int i = 0;
                                int shift = initShift;
@@ -297,7 +297,7 @@ public class ParquetRowDataWriter {
                        }
 
                        @Override
-                       public void write(BaseRow row, int ordinal) {
+                       public void write(RowData row, int ordinal) {
                                byte[] bytes = row.getDecimal(ordinal, 
precision, scale).toUnscaledBytes();
                                byte[] writtenBytes;
                                if (bytes.length == numBytes) {
@@ -315,7 +315,7 @@ public class ParquetRowDataWriter {
 
                // 1 <= precision <= 18, writes as FIXED_LEN_BYTE_ARRAY
                // optimizer for UnscaledBytesWriter
-               if (Decimal.is32BitDecimal(precision) || 
Decimal.is64BitDecimal(precision)) {
+               if (DecimalDataUtils.is32BitDecimal(precision) || 
DecimalDataUtils.is64BitDecimal(precision)) {
                        return new LongUnscaledBytesWriter();
                }
 
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java
index 8335381..f892d3a 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReader.java
@@ -20,10 +20,10 @@ package org.apache.flink.formats.parquet.vector;
 
 import org.apache.flink.formats.parquet.vector.reader.AbstractColumnReader;
 import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
-import org.apache.flink.table.dataformat.ColumnarRow;
-import org.apache.flink.table.dataformat.vector.ColumnVector;
-import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch;
-import org.apache.flink.table.dataformat.vector.writable.WritableColumnVector;
+import org.apache.flink.table.data.ColumnarRowData;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
 
@@ -73,7 +73,7 @@ public class ParquetColumnarRowSplitReader implements 
Closeable {
 
        private final VectorizedColumnBatch columnarBatch;
 
-       private final ColumnarRow row;
+       private final ColumnarRowData row;
 
        private final LogicalType[] selectedTypes;
 
@@ -140,7 +140,7 @@ public class ParquetColumnarRowSplitReader implements 
Closeable {
 
                this.writableVectors = createWritableVectors();
                this.columnarBatch = 
generator.generate(createReadableVectors());
-               this.row = new ColumnarRow(columnarBatch);
+               this.row = new ColumnarRowData(columnarBatch);
        }
 
        /**
@@ -222,7 +222,7 @@ public class ParquetColumnarRowSplitReader implements 
Closeable {
                return !ensureBatch();
        }
 
-       public ColumnarRow nextRecord() {
+       public ColumnarRowData nextRecord() {
                // return the next row
                row.setRowId(this.nextRow++);
                return row;
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java
index 288aed0..88c1aa3 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDecimalVector.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.formats.parquet.vector;
 
-import org.apache.flink.table.dataformat.Decimal;
-import org.apache.flink.table.dataformat.vector.BytesColumnVector;
-import org.apache.flink.table.dataformat.vector.ColumnVector;
-import org.apache.flink.table.dataformat.vector.DecimalColumnVector;
-import org.apache.flink.table.dataformat.vector.IntColumnVector;
-import org.apache.flink.table.dataformat.vector.LongColumnVector;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.vector.BytesColumnVector;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.DecimalColumnVector;
+import org.apache.flink.table.data.vector.IntColumnVector;
+import org.apache.flink.table.data.vector.LongColumnVector;
 
 /**
  * Parquet write decimal as int32 and int64 and binary, this class wrap the 
real vector to
@@ -38,16 +39,22 @@ public class ParquetDecimalVector implements 
DecimalColumnVector {
        }
 
        @Override
-       public Decimal getDecimal(int i, int precision, int scale) {
-               if (Decimal.is32BitDecimal(precision)) {
-                       return Decimal.fromUnscaledLong(
-                                       precision, scale, ((IntColumnVector) 
vector).getInt(i));
-               } else if (Decimal.is64BitDecimal(precision)) {
-                       return Decimal.fromUnscaledLong(
-                                       precision, scale, ((LongColumnVector) 
vector).getLong(i));
+       public DecimalData getDecimal(int i, int precision, int scale) {
+               if (DecimalDataUtils.is32BitDecimal(precision)) {
+                       return DecimalData.fromUnscaledLong(
+                               ((IntColumnVector) vector).getInt(i),
+                               precision,
+                               scale);
+               } else if (DecimalDataUtils.is64BitDecimal(precision)) {
+                       return DecimalData.fromUnscaledLong(
+                               ((LongColumnVector) vector).getLong(i),
+                               precision,
+                               scale);
                } else {
-                       return Decimal.fromUnscaledBytes(
-                                       precision, scale, ((BytesColumnVector) 
vector).getBytes(i).getBytes());
+                       return DecimalData.fromUnscaledBytes(
+                               ((BytesColumnVector) 
vector).getBytes(i).getBytes(),
+                               precision,
+                               scale);
                }
        }
 
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java
index c93025e..b574837 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetDictionary.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.formats.parquet.vector;
 
-import org.apache.flink.table.dataformat.SqlTimestamp;
-import org.apache.flink.table.dataformat.vector.Dictionary;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.vector.Dictionary;
 
 import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.decodeInt96ToTimestamp;
 
@@ -60,7 +60,7 @@ public final class ParquetDictionary implements Dictionary {
        }
 
        @Override
-       public SqlTimestamp decodeToTimestamp(int id) {
+       public TimestampData decodeToTimestamp(int id) {
                return decodeInt96ToTimestamp(true, dictionary, id);
        }
 }
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
index dab7ff9..8be1e2f 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/ParquetSplitReaderUtil.java
@@ -30,20 +30,21 @@ import 
org.apache.flink.formats.parquet.vector.reader.IntColumnReader;
 import org.apache.flink.formats.parquet.vector.reader.LongColumnReader;
 import org.apache.flink.formats.parquet.vector.reader.ShortColumnReader;
 import org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader;
-import org.apache.flink.table.dataformat.Decimal;
-import org.apache.flink.table.dataformat.SqlTimestamp;
-import org.apache.flink.table.dataformat.vector.ColumnVector;
-import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch;
-import org.apache.flink.table.dataformat.vector.heap.HeapBooleanVector;
-import org.apache.flink.table.dataformat.vector.heap.HeapByteVector;
-import org.apache.flink.table.dataformat.vector.heap.HeapBytesVector;
-import org.apache.flink.table.dataformat.vector.heap.HeapDoubleVector;
-import org.apache.flink.table.dataformat.vector.heap.HeapFloatVector;
-import org.apache.flink.table.dataformat.vector.heap.HeapIntVector;
-import org.apache.flink.table.dataformat.vector.heap.HeapLongVector;
-import org.apache.flink.table.dataformat.vector.heap.HeapShortVector;
-import org.apache.flink.table.dataformat.vector.heap.HeapTimestampVector;
-import org.apache.flink.table.dataformat.vector.writable.WritableColumnVector;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.vector.ColumnVector;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.vector.heap.HeapBooleanVector;
+import org.apache.flink.table.data.vector.heap.HeapByteVector;
+import org.apache.flink.table.data.vector.heap.HeapBytesVector;
+import org.apache.flink.table.data.vector.heap.HeapDoubleVector;
+import org.apache.flink.table.data.vector.heap.HeapFloatVector;
+import org.apache.flink.table.data.vector.heap.HeapIntVector;
+import org.apache.flink.table.data.vector.heap.HeapLongVector;
+import org.apache.flink.table.data.vector.heap.HeapShortVector;
+import org.apache.flink.table.data.vector.heap.HeapTimestampVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.DecimalType;
@@ -192,15 +193,15 @@ public class ParquetSplitReaderUtil {
                                DecimalType decimalType = (DecimalType) type;
                                int precision = decimalType.getPrecision();
                                int scale = decimalType.getScale();
-                               Decimal decimal = value == null ? null : 
Preconditions.checkNotNull(
-                                               
Decimal.fromBigDecimal((BigDecimal) value, precision, scale));
+                               DecimalData decimal = value == null ? null : 
Preconditions.checkNotNull(
+                                               
DecimalData.fromBigDecimal((BigDecimal) value, precision, scale));
                                ColumnVector internalVector;
-                               if (Decimal.is32BitDecimal(precision)) {
+                               if (DecimalDataUtils.is32BitDecimal(precision)) 
{
                                        internalVector = 
createVectorFromConstant(
                                                        new IntType(),
                                                        decimal == null ? null 
: (int) decimal.toUnscaledLong(),
                                                        batchSize);
-                               } else if (Decimal.is64BitDecimal(precision)) {
+                               } else if 
(DecimalDataUtils.is64BitDecimal(precision)) {
                                        internalVector = 
createVectorFromConstant(
                                                        new BigIntType(),
                                                        decimal == null ? null 
: decimal.toUnscaledLong(),
@@ -241,7 +242,7 @@ public class ParquetSplitReaderUtil {
                                if (value == null) {
                                        tv.fillWithNulls();
                                } else {
-                                       
tv.fill(SqlTimestamp.fromLocalDateTime((LocalDateTime) value));
+                                       
tv.fill(TimestampData.fromLocalDateTime((LocalDateTime) value));
                                }
                                return tv;
                        default:
@@ -355,14 +356,14 @@ public class ParquetSplitReaderUtil {
                                return new HeapTimestampVector(batchSize);
                        case DECIMAL:
                                DecimalType decimalType = (DecimalType) 
fieldType;
-                               if 
(Decimal.is32BitDecimal(decimalType.getPrecision())) {
+                               if 
(DecimalDataUtils.is32BitDecimal(decimalType.getPrecision())) {
                                        checkArgument(
                                                        (typeName == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY ||
                                                                        
typeName == PrimitiveType.PrimitiveTypeName.INT32) &&
                                                                        
primitiveType.getOriginalType() == OriginalType.DECIMAL,
                                                        "Unexpected type: %s", 
typeName);
                                        return new HeapIntVector(batchSize);
-                               } else if 
(Decimal.is64BitDecimal(decimalType.getPrecision())) {
+                               } else if 
(DecimalDataUtils.is64BitDecimal(decimalType.getPrecision())) {
                                        checkArgument(
                                                        (typeName == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY ||
                                                                        
typeName == PrimitiveType.PrimitiveTypeName.INT64) &&
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java
index 786f546..6bbaf06 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/AbstractColumnReader.java
@@ -18,8 +18,8 @@
 package org.apache.flink.formats.parquet.vector.reader;
 
 import org.apache.flink.formats.parquet.vector.ParquetDictionary;
-import org.apache.flink.table.dataformat.vector.writable.WritableColumnVector;
-import org.apache.flink.table.dataformat.vector.writable.WritableIntVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
 
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.ByteBufferInputStream;
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BooleanColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BooleanColumnReader.java
index 504a504..6c4dfea 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BooleanColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BooleanColumnReader.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.formats.parquet.vector.reader;
 
-import org.apache.flink.table.dataformat.vector.writable.WritableBooleanVector;
-import org.apache.flink.table.dataformat.vector.writable.WritableIntVector;
+import org.apache.flink.table.data.vector.writable.WritableBooleanVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReader;
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ByteColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ByteColumnReader.java
index ada3405..9034514 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ByteColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ByteColumnReader.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.formats.parquet.vector.reader;
 
-import org.apache.flink.table.dataformat.vector.writable.WritableByteVector;
-import org.apache.flink.table.dataformat.vector.writable.WritableIntVector;
+import org.apache.flink.table.data.vector.writable.WritableByteVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReader;
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BytesColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BytesColumnReader.java
index 3f6a135..261ae92 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BytesColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/BytesColumnReader.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.formats.parquet.vector.reader;
 
-import org.apache.flink.table.dataformat.vector.writable.WritableBytesVector;
-import org.apache.flink.table.dataformat.vector.writable.WritableIntVector;
+import org.apache.flink.table.data.vector.writable.WritableBytesVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReader;
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ColumnReader.java
index f05dc6c..3adea5d 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ColumnReader.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.formats.parquet.vector.reader;
 
-import org.apache.flink.table.dataformat.vector.writable.WritableColumnVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
 
 import java.io.IOException;
 
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/DoubleColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/DoubleColumnReader.java
index 4999c72..00e8d95 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/DoubleColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/DoubleColumnReader.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.formats.parquet.vector.reader;
 
-import org.apache.flink.table.dataformat.vector.writable.WritableDoubleVector;
-import org.apache.flink.table.dataformat.vector.writable.WritableIntVector;
+import org.apache.flink.table.data.vector.writable.WritableDoubleVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReader;
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java
index 835562e..5874073 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FixedLenBytesColumnReader.java
@@ -17,11 +17,11 @@
 
 package org.apache.flink.formats.parquet.vector.reader;
 
-import org.apache.flink.table.dataformat.Decimal;
-import org.apache.flink.table.dataformat.vector.writable.WritableBytesVector;
-import org.apache.flink.table.dataformat.vector.writable.WritableColumnVector;
-import org.apache.flink.table.dataformat.vector.writable.WritableIntVector;
-import org.apache.flink.table.dataformat.vector.writable.WritableLongVector;
+import org.apache.flink.table.data.DecimalDataUtils;
+import org.apache.flink.table.data.vector.writable.WritableBytesVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
+import org.apache.flink.table.data.vector.writable.WritableLongVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReader;
@@ -50,7 +50,7 @@ public class FixedLenBytesColumnReader<VECTOR extends 
WritableColumnVector> exte
        @Override
        protected void readBatch(int rowId, int num, VECTOR column) {
                int bytesLen = descriptor.getPrimitiveType().getTypeLength();
-               if (Decimal.is32BitDecimal(precision)) {
+               if (DecimalDataUtils.is32BitDecimal(precision)) {
                        WritableIntVector intVector = (WritableIntVector) 
column;
                        for (int i = 0; i < num; i++) {
                                if (runLenDecoder.readInteger() == maxDefLevel) 
{
@@ -59,7 +59,7 @@ public class FixedLenBytesColumnReader<VECTOR extends 
WritableColumnVector> exte
                                        intVector.setNullAt(rowId + i);
                                }
                        }
-               } else if (Decimal.is64BitDecimal(precision)) {
+               } else if (DecimalDataUtils.is64BitDecimal(precision)) {
                        WritableLongVector longVector = (WritableLongVector) 
column;
                        for (int i = 0; i < num; i++) {
                                if (runLenDecoder.readInteger() == maxDefLevel) 
{
@@ -87,7 +87,7 @@ public class FixedLenBytesColumnReader<VECTOR extends 
WritableColumnVector> exte
                        int num,
                        VECTOR column,
                        WritableIntVector dictionaryIds) {
-               if (Decimal.is32BitDecimal(precision)) {
+               if (DecimalDataUtils.is32BitDecimal(precision)) {
                        WritableIntVector intVector = (WritableIntVector) 
column;
                        for (int i = rowId; i < rowId + num; ++i) {
                                if (!intVector.isNullAt(i)) {
@@ -95,7 +95,7 @@ public class FixedLenBytesColumnReader<VECTOR extends 
WritableColumnVector> exte
                                        intVector.setInt(i, (int) 
heapBinaryToLong(v));
                                }
                        }
-               } else if (Decimal.is64BitDecimal(precision)) {
+               } else if (DecimalDataUtils.is64BitDecimal(precision)) {
                        WritableLongVector longVector = (WritableLongVector) 
column;
                        for (int i = rowId; i < rowId + num; ++i) {
                                if (!longVector.isNullAt(i)) {
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FloatColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FloatColumnReader.java
index 71b794a..0a2b14d 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FloatColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/FloatColumnReader.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.formats.parquet.vector.reader;
 
-import org.apache.flink.table.dataformat.vector.writable.WritableFloatVector;
-import org.apache.flink.table.dataformat.vector.writable.WritableIntVector;
+import org.apache.flink.table.data.vector.writable.WritableFloatVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReader;
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/IntColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/IntColumnReader.java
index 9b07367..c0b2645 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/IntColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/IntColumnReader.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.formats.parquet.vector.reader;
 
-import org.apache.flink.table.dataformat.vector.writable.WritableIntVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReader;
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/LongColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/LongColumnReader.java
index b68a732..bffe238 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/LongColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/LongColumnReader.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.formats.parquet.vector.reader;
 
-import org.apache.flink.table.dataformat.vector.writable.WritableIntVector;
-import org.apache.flink.table.dataformat.vector.writable.WritableLongVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
+import org.apache.flink.table.data.vector.writable.WritableLongVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReader;
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RunLengthDecoder.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RunLengthDecoder.java
index 3337f6d..6eb6298 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RunLengthDecoder.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/RunLengthDecoder.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.formats.parquet.vector.reader;
 
-import org.apache.flink.table.dataformat.vector.writable.WritableColumnVector;
-import org.apache.flink.table.dataformat.vector.writable.WritableIntVector;
+import org.apache.flink.table.data.vector.writable.WritableColumnVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
 
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.ByteBufferInputStream;
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ShortColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ShortColumnReader.java
index a98051d..0e3f83c 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ShortColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/ShortColumnReader.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.formats.parquet.vector.reader;
 
-import org.apache.flink.table.dataformat.vector.writable.WritableIntVector;
-import org.apache.flink.table.dataformat.vector.writable.WritableShortVector;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
+import org.apache.flink.table.data.vector.writable.WritableShortVector;
 
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReader;
diff --git 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
index 256ae72..21006a2 100644
--- 
a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
+++ 
b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/vector/reader/TimestampColumnReader.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.formats.parquet.vector.reader;
 
-import org.apache.flink.table.dataformat.SqlTimestamp;
-import org.apache.flink.table.dataformat.vector.writable.WritableIntVector;
-import 
org.apache.flink.table.dataformat.vector.writable.WritableTimestampVector;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.vector.writable.WritableIntVector;
+import org.apache.flink.table.data.vector.writable.WritableTimestampVector;
 
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -88,7 +88,7 @@ public class TimestampColumnReader extends 
AbstractColumnReader<WritableTimestam
                }
        }
 
-       public static SqlTimestamp decodeInt96ToTimestamp(
+       public static TimestampData decodeInt96ToTimestamp(
                        boolean utcTimestamp,
                        org.apache.parquet.column.Dictionary dictionary,
                        int id) {
@@ -100,17 +100,17 @@ public class TimestampColumnReader extends 
AbstractColumnReader<WritableTimestam
                return int96ToTimestamp(utcTimestamp, buffer.getLong(), 
buffer.getInt());
        }
 
-       private static SqlTimestamp int96ToTimestamp(
+       private static TimestampData int96ToTimestamp(
                        boolean utcTimestamp, long nanosOfDay, int julianDay) {
                long millisecond = julianDayToMillis(julianDay) + (nanosOfDay / 
NANOS_PER_MILLISECOND);
 
                if (utcTimestamp) {
                        int nanoOfMillisecond = (int) (nanosOfDay % 
NANOS_PER_MILLISECOND);
-                       return SqlTimestamp.fromEpochMillis(millisecond, 
nanoOfMillisecond);
+                       return TimestampData.fromEpochMillis(millisecond, 
nanoOfMillisecond);
                } else {
                        Timestamp timestamp = new Timestamp(millisecond);
                        timestamp.setNanos((int) (nanosOfDay % 
NANOS_PER_SECOND));
-                       return SqlTimestamp.fromTimestamp(timestamp);
+                       return TimestampData.fromTimestamp(timestamp);
                }
        }
 
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
index 508a36c..eecf1b8 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/row/ParquetRowDataWriterTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.parquet.ParquetWriterFactory;
 import org.apache.flink.formats.parquet.vector.ParquetColumnarRowSplitReader;
 import org.apache.flink.formats.parquet.vector.ParquetSplitReaderUtil;
-import org.apache.flink.table.dataformat.BaseRow;
-import org.apache.flink.table.dataformat.DataFormatConverters;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.BigIntType;
 import org.apache.flink.table.types.logical.BooleanType;
@@ -83,7 +83,7 @@ public class ParquetRowDataWriterTest {
                        new DecimalType(20, 0));
 
        @SuppressWarnings("unchecked")
-       private static final DataFormatConverters.DataFormatConverter<BaseRow, 
Row> CONVERTER =
+       private static final DataFormatConverters.DataFormatConverter<RowData, 
Row> CONVERTER =
                        DataFormatConverters.getConverterForDataType(
                                        
TypeConversions.fromLogicalToDataType(ROW_TYPE));
 
@@ -126,9 +126,9 @@ public class ParquetRowDataWriterTest {
                                        BigDecimal.valueOf(v)));
                }
 
-               ParquetWriterFactory<BaseRow> factory = 
ParquetRowDataBuilder.createWriterFactory(
+               ParquetWriterFactory<RowData> factory = 
ParquetRowDataBuilder.createWriterFactory(
                                ROW_TYPE, conf, utcTimestamp);
-               BulkWriter<BaseRow> writer = 
factory.create(path.getFileSystem().create(
+               BulkWriter<RowData> writer = 
factory.create(path.getFileSystem().create(
                                path, FileSystem.WriteMode.OVERWRITE));
                for (int i = 0; i < number; i++) {
                        writer.addElement(CONVERTER.toInternal(rows.get(i)));
diff --git 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
index 758c267..d2098d5 100644
--- 
a/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
+++ 
b/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/vector/ParquetColumnarRowSplitReaderTest.java
@@ -19,9 +19,9 @@
 package org.apache.flink.formats.parquet.vector;
 
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.dataformat.ColumnarRow;
-import org.apache.flink.table.dataformat.Decimal;
-import org.apache.flink.table.dataformat.vector.VectorizedColumnBatch;
+import org.apache.flink.table.data.ColumnarRowData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.vector.VectorizedColumnBatch;
 import org.apache.flink.table.runtime.functions.SqlDateTimeUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.BigIntType;
@@ -212,7 +212,7 @@ public class ParquetColumnarRowSplitReaderTest {
 
                int i = start;
                while (!reader.reachedEnd()) {
-                       ColumnarRow row = reader.nextRecord();
+                       ColumnarRowData row = reader.nextRecord();
                        Integer v = values.get(i);
                        if (v == null) {
                                assertTrue(row.isNullAt(0));
@@ -405,7 +405,7 @@ public class ParquetColumnarRowSplitReaderTest {
                                Long.MAX_VALUE);
                int i = 0;
                while (!reader.reachedEnd()) {
-                       ColumnarRow row = reader.nextRecord();
+                       ColumnarRowData row = reader.nextRecord();
                        assertEquals(i, row.getDouble(0), 0);
                        assertEquals((byte) i, row.getByte(1));
                        assertEquals(i, row.getInt(2));
@@ -500,7 +500,7 @@ public class ParquetColumnarRowSplitReaderTest {
                                Long.MAX_VALUE);
                int i = 0;
                while (!reader.reachedEnd()) {
-                       ColumnarRow row = reader.nextRecord();
+                       ColumnarRowData row = reader.nextRecord();
 
                        // common values
                        assertEquals(i, row.getDouble(0), 0);
@@ -527,13 +527,13 @@ public class ParquetColumnarRowSplitReaderTest {
                                                LocalDateTime.of(1999, 1, 1, 1, 
1),
                                                row.getTimestamp(11, 
9).toLocalDateTime());
                                assertEquals(
-                                               Decimal.fromBigDecimal(new 
BigDecimal(24), 5, 0),
+                                               DecimalData.fromBigDecimal(new 
BigDecimal(24), 5, 0),
                                                row.getDecimal(12, 5, 0));
                                assertEquals(
-                                               Decimal.fromBigDecimal(new 
BigDecimal(25), 15, 0),
+                                               DecimalData.fromBigDecimal(new 
BigDecimal(25), 15, 0),
                                                row.getDecimal(13, 15, 0));
                                assertEquals(
-                                               Decimal.fromBigDecimal(new 
BigDecimal(26), 20, 0),
+                                               DecimalData.fromBigDecimal(new 
BigDecimal(26), 20, 0),
                                                row.getDecimal(14, 20, 0));
                                assertEquals("f27", 
row.getString(15).toString());
                        }

Reply via email to