This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 54a80ed02 [format] ParquetDataColumnReader supports reading timestamp
with precision <= 6 (#1599)
54a80ed02 is described below
commit 54a80ed02d88e3f6286d2981c3615094ad1ed076
Author: yuzelin <[email protected]>
AuthorDate: Fri Jul 21 19:12:45 2023 +0800
[format] ParquetDataColumnReader supports reading timestamp with precision
<= 6 (#1599)
---
.../java/org/apache/paimon/data/GenericArray.java | 14 +-
.../apache/paimon/data/columnar/ColumnarArray.java | 6 +
.../main/java/org/apache/paimon/types/RowType.java | 14 ++
.../java/org/apache/paimon/utils/ArrayUtils.java | 28 ++--
.../apache/paimon/format/FormatReadWriteTest.java | 182 +++++++++++++++++----
.../format/parquet/reader/ArrayColumnReader.java | 35 +++-
.../parquet/reader/ParquetDataColumnReader.java | 7 +-
.../reader/ParquetDataColumnReaderFactory.java | 16 +-
.../format/avro/AvroFormatReadWriteTest.java | 4 +
.../OrcFormatReadWriteTest.java} | 11 +-
.../ParquetFormatReadWriteTest.java} | 13 +-
.../paimon/hive/PaimonStorageHandlerITCase.java | 32 ++--
12 files changed, 274 insertions(+), 88 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java
b/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java
index c95ad6525..78cfa2350 100644
--- a/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java
+++ b/paimon-common/src/main/java/org/apache/paimon/data/GenericArray.java
@@ -263,7 +263,7 @@ public final class GenericArray implements InternalArray,
Serializable {
return (boolean[]) array;
}
checkNoNull();
- return ArrayUtils.toPrimitive((Boolean[]) array);
+ return ArrayUtils.toPrimitiveBoolean((Object[]) array);
}
@Override
@@ -272,7 +272,7 @@ public final class GenericArray implements InternalArray,
Serializable {
return (byte[]) array;
}
checkNoNull();
- return ArrayUtils.toPrimitive((Byte[]) array);
+ return ArrayUtils.toPrimitiveByte((Object[]) array);
}
@Override
@@ -281,7 +281,7 @@ public final class GenericArray implements InternalArray,
Serializable {
return (short[]) array;
}
checkNoNull();
- return ArrayUtils.toPrimitive((Short[]) array);
+ return ArrayUtils.toPrimitiveShort((Object[]) array);
}
@Override
@@ -290,7 +290,7 @@ public final class GenericArray implements InternalArray,
Serializable {
return (int[]) array;
}
checkNoNull();
- return ArrayUtils.toPrimitive((Integer[]) array);
+ return ArrayUtils.toPrimitiveInteger((Object[]) array);
}
@Override
@@ -299,7 +299,7 @@ public final class GenericArray implements InternalArray,
Serializable {
return (long[]) array;
}
checkNoNull();
- return ArrayUtils.toPrimitive((Long[]) array);
+ return ArrayUtils.toPrimitiveLong((Object[]) array);
}
@Override
@@ -308,7 +308,7 @@ public final class GenericArray implements InternalArray,
Serializable {
return (float[]) array;
}
checkNoNull();
- return ArrayUtils.toPrimitive((Float[]) array);
+ return ArrayUtils.toPrimitiveFloat((Object[]) array);
}
@Override
@@ -317,6 +317,6 @@ public final class GenericArray implements InternalArray,
Serializable {
return (double[]) array;
}
checkNoNull();
- return ArrayUtils.toPrimitive((Double[]) array);
+ return ArrayUtils.toPrimitiveDouble((Object[]) array);
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
index d13bc7165..7230564ba 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarArray.java
@@ -246,4 +246,10 @@ public final class ColumnarArray implements InternalArray,
DataSetters, Serializ
private BytesColumnVector.Bytes getByteArray(int pos) {
return ((BytesColumnVector) data).getBytes(offset + pos);
}
+
+ @Override
+ public boolean equals(Object o) {
+ throw new UnsupportedOperationException(
+ "ColumnarArray do not support equals, please compare fields
one by one!");
+ }
}
diff --git a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
index a9a41da20..f0d41486b 100644
--- a/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-common/src/main/java/org/apache/paimon/types/RowType.java
@@ -19,6 +19,7 @@
package org.apache.paimon.types;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
@@ -33,6 +34,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
/**
* Data type of a sequence of fields. A field consists of a field name, field
type, and an optional
@@ -51,6 +53,7 @@ public final class RowType extends DataType {
public static final String FORMAT = "ROW<%s>";
private final List<DataField> fields;
+ private InternalRow.FieldGetter[] fieldGetters;
public RowType(boolean isNullable, List<DataField> fields) {
super(isNullable, DataTypeRoot.ROW);
@@ -252,4 +255,15 @@ public final class RowType extends DataType {
return new RowType(isNullable, fields);
}
}
+
+ public InternalRow.FieldGetter[] fieldGetters() {
+ if (fieldGetters == null) {
+ fieldGetters =
+ IntStream.range(0, getFieldCount())
+ .mapToObj(i ->
InternalRow.createFieldGetter(getTypeAt(i), i))
+ .toArray(InternalRow.FieldGetter[]::new);
+ }
+
+ return fieldGetters;
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/ArrayUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ArrayUtils.java
index d34df6ce8..68972a068 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/ArrayUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ArrayUtils.java
@@ -210,7 +210,7 @@ public class ArrayUtils {
* @return a {@code boolean} array, {@code null} if null array input
* @throws NullPointerException if array content is {@code null}
*/
- public static boolean[] toPrimitive(final Boolean[] array) {
+ public static boolean[] toPrimitiveBoolean(final Object[] array) {
if (array == null) {
return null;
} else if (array.length == 0) {
@@ -218,7 +218,7 @@ public class ArrayUtils {
}
final boolean[] result = new boolean[array.length];
for (int i = 0; i < array.length; i++) {
- result[i] = array[i];
+ result[i] = (boolean) array[i];
}
return result;
}
@@ -232,7 +232,7 @@ public class ArrayUtils {
* @return a {@code byte} array, {@code null} if null array input
* @throws NullPointerException if array content is {@code null}
*/
- public static byte[] toPrimitive(final Byte[] array) {
+ public static byte[] toPrimitiveByte(final Object[] array) {
if (array == null) {
return null;
} else if (array.length == 0) {
@@ -240,7 +240,7 @@ public class ArrayUtils {
}
final byte[] result = new byte[array.length];
for (int i = 0; i < array.length; i++) {
- result[i] = array[i];
+ result[i] = (byte) array[i];
}
return result;
}
@@ -254,7 +254,7 @@ public class ArrayUtils {
* @return a {@code double} array, {@code null} if null array input
* @throws NullPointerException if array content is {@code null}
*/
- public static double[] toPrimitive(final Double[] array) {
+ public static double[] toPrimitiveDouble(final Object[] array) {
if (array == null) {
return null;
} else if (array.length == 0) {
@@ -262,7 +262,7 @@ public class ArrayUtils {
}
final double[] result = new double[array.length];
for (int i = 0; i < array.length; i++) {
- result[i] = array[i];
+ result[i] = (double) array[i];
}
return result;
}
@@ -276,7 +276,7 @@ public class ArrayUtils {
* @return a {@code float} array, {@code null} if null array input
* @throws NullPointerException if array content is {@code null}
*/
- public static float[] toPrimitive(final Float[] array) {
+ public static float[] toPrimitiveFloat(final Object[] array) {
if (array == null) {
return null;
} else if (array.length == 0) {
@@ -284,7 +284,7 @@ public class ArrayUtils {
}
final float[] result = new float[array.length];
for (int i = 0; i < array.length; i++) {
- result[i] = array[i];
+ result[i] = (float) array[i];
}
return result;
}
@@ -298,7 +298,7 @@ public class ArrayUtils {
* @return an {@code int} array, {@code null} if null array input
* @throws NullPointerException if array content is {@code null}
*/
- public static int[] toPrimitive(final Integer[] array) {
+ public static int[] toPrimitiveInteger(final Object[] array) {
if (array == null) {
return null;
} else if (array.length == 0) {
@@ -306,7 +306,7 @@ public class ArrayUtils {
}
final int[] result = new int[array.length];
for (int i = 0; i < array.length; i++) {
- result[i] = array[i];
+ result[i] = (int) array[i];
}
return result;
}
@@ -320,7 +320,7 @@ public class ArrayUtils {
* @return a {@code long} array, {@code null} if null array input
* @throws NullPointerException if array content is {@code null}
*/
- public static long[] toPrimitive(final Long[] array) {
+ public static long[] toPrimitiveLong(final Object[] array) {
if (array == null) {
return null;
} else if (array.length == 0) {
@@ -328,7 +328,7 @@ public class ArrayUtils {
}
final long[] result = new long[array.length];
for (int i = 0; i < array.length; i++) {
- result[i] = array[i];
+ result[i] = (long) array[i];
}
return result;
}
@@ -342,7 +342,7 @@ public class ArrayUtils {
* @return a {@code byte} array, {@code null} if null array input
* @throws NullPointerException if array content is {@code null}
*/
- public static short[] toPrimitive(final Short[] array) {
+ public static short[] toPrimitiveShort(final Object[] array) {
if (array == null) {
return null;
} else if (array.length == 0) {
@@ -350,7 +350,7 @@ public class ArrayUtils {
}
final short[] result = new short[array.length];
for (int i = 0; i < array.length; i++) {
- result[i] = array[i];
+ result[i] = (short) array[i];
}
return result;
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
index cd2fd36da..43194946d 100644
---
a/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/format/FormatReadWriteTest.java
@@ -22,6 +22,8 @@ import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericArray;
import org.apache.paimon.data.GenericMap;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.serializer.InternalRowSerializer;
@@ -30,6 +32,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -40,6 +43,7 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
@@ -53,9 +57,15 @@ public abstract class FormatReadWriteTest {
@TempDir java.nio.file.Path tempPath;
+ private final String formatType;
+
private FileIO fileIO;
private Path file;
+ protected FormatReadWriteTest(String formatType) {
+ this.formatType = formatType;
+ }
+
@BeforeEach
public void beforeEach() {
this.fileIO = LocalFileIO.create();
@@ -94,28 +104,41 @@ public abstract class FormatReadWriteTest {
GenericRow.of(1, 1L), GenericRow.of(2, 2L),
GenericRow.of(3, null));
}
+ /**
+ * Currently, Parquet format doesn't support nested row in array, so this
test handles Parquet
+ * specially.
+ */
@Test
public void testFullTypes() throws IOException {
- RowType rowType =
+ RowType rowType = rowTypeForFullTypesTest();
+ InternalRow expected = expectedRowForFullTypesTest();
+ FileFormat format = fileFormat();
+
+ PositionOutputStream out = fileIO.newOutputStream(file, false);
+ FormatWriter writer = format.createWriterFactory(rowType).create(out,
null);
+ writer.addElement(expected);
+ writer.flush();
+ writer.finish();
+ out.close();
+
+ RecordReader<InternalRow> reader =
+ format.createReaderFactory(rowType).createReader(fileIO, file);
+ List<InternalRow> result = new ArrayList<>();
+ reader.forEachRemaining(result::add);
+ assertThat(result.size()).isEqualTo(1);
+
+ validateFullTypesResult(result.get(0), expected);
+ }
+
+ private RowType rowTypeForFullTypesTest() {
+ RowType.Builder builder =
RowType.builder()
.field("id", DataTypes.INT().notNull())
.field("name", DataTypes.STRING()) /* optional by
default */
.field("salary", DataTypes.DOUBLE().notNull())
.field(
"locations",
- DataTypes.MAP(
- DataTypes.STRING().notNull(),
- DataTypes.ROW(
- DataTypes.FIELD(
- 0,
- "posX",
-
DataTypes.DOUBLE().notNull(),
- "X field"),
- DataTypes.FIELD(
- 1,
- "posY",
-
DataTypes.DOUBLE().notNull(),
- "Y field"))))
+ DataTypes.MAP(DataTypes.STRING().notNull(),
getMapValueType()))
.field("strArray",
DataTypes.ARRAY(DataTypes.STRING()).nullable())
.field("intArray",
DataTypes.ARRAY(DataTypes.INT()).nullable())
.field("boolean", DataTypes.BOOLEAN().nullable())
@@ -128,27 +151,45 @@ public abstract class FormatReadWriteTest {
.field("date", DataTypes.DATE())
.field("decimal", DataTypes.DECIMAL(2, 2))
.field("decimal2", DataTypes.DECIMAL(38, 2))
- .field("decimal3", DataTypes.DECIMAL(10, 1))
- .build();
+ .field("decimal3", DataTypes.DECIMAL(10, 1));
+
+ if (formatType.equals("avro") || formatType.equals("orc")) {
+ builder.field(
+ "rowArray",
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ 0,
+ "int0",
+ DataTypes.INT().notNull(),
+ "nested row int field 0"),
+ DataTypes.FIELD(
+ 1,
+ "double1",
+ DataTypes.DOUBLE().notNull(),
+ "nested row double field 1"))));
+ }
+ RowType rowType = builder.build();
if (ThreadLocalRandom.current().nextBoolean()) {
rowType = (RowType) rowType.notNull();
}
- FileFormat format = fileFormat();
+ return rowType;
+ }
- PositionOutputStream out = fileIO.newOutputStream(file, false);
- FormatWriter writer = format.createWriterFactory(rowType).create(out,
null);
- GenericRow expected =
- GenericRow.of(
+ private GenericRow expectedRowForFullTypesTest() {
+ Object[] mapValueData = getMapValueData();
+ List<Object> values =
+ Arrays.asList(
1,
fromString("name"),
5.26D,
new GenericMap(
new HashMap<Object, Object>() {
{
- this.put(fromString("key1"),
GenericRow.of(5.2D, 6.2D));
- this.put(fromString("key2"),
GenericRow.of(6.2D, 2.2D));
+ this.put(fromString("key1"),
mapValueData[0]);
+ this.put(fromString("key2"),
mapValueData[1]);
}
}),
new GenericArray(new Object[] {fromString("123"),
fromString("456")}),
@@ -161,19 +202,90 @@ public abstract class FormatReadWriteTest {
Timestamp.fromMicros(123123123),
Timestamp.fromEpochMillis(123123123),
2456,
- Decimal.fromBigDecimal(new BigDecimal(0.22), 2, 2),
- Decimal.fromBigDecimal(new BigDecimal(12312455.22),
38, 2),
- Decimal.fromBigDecimal(new BigDecimal(12455.1), 10,
1));
- writer.addElement(expected);
- writer.flush();
- writer.finish();
- out.close();
+ Decimal.fromBigDecimal(new BigDecimal("0.22"), 2, 2),
+ Decimal.fromBigDecimal(new BigDecimal("12312455.22"),
38, 2),
+ Decimal.fromBigDecimal(new BigDecimal("12455.1"), 10,
1));
- RecordReader<InternalRow> reader =
- format.createReaderFactory(rowType).createReader(fileIO, file);
- List<InternalRow> result = new ArrayList<>();
- reader.forEachRemaining(result::add);
+ if (formatType.equals("avro") || formatType.equals("orc")) {
+ values = new ArrayList<>(values);
+ values.add(
+ new GenericArray(
+ new Object[] {GenericRow.of(1, 0.1D),
GenericRow.of(2, 0.2D)}));
+ }
+ return GenericRow.of(values.toArray());
+ }
- assertThat(result).containsExactly(expected);
+ private DataType getMapValueType() {
+ if (formatType.equals("avro") || formatType.equals("orc")) {
+ return DataTypes.ROW(
+ DataTypes.FIELD(0, "posX", DataTypes.DOUBLE().notNull(),
"X field"),
+ DataTypes.FIELD(1, "posY", DataTypes.DOUBLE().notNull(),
"Y field"));
+ } else {
+ return DataTypes.DOUBLE();
+ }
+ }
+
+ private Object[] getMapValueData() {
+ if (formatType.equals("avro") || formatType.equals("orc")) {
+ // allow nested row in array
+ return new Object[] {GenericRow.of(5.2D, 6.2D),
GenericRow.of(6.2D, 2.2D)};
+ } else {
+ return new Object[] {5.2D, 6.2D};
+ }
+ }
+
+ private void validateFullTypesResult(InternalRow actual, InternalRow
expected) {
+ if (formatType.equals("avro") || formatType.equals("orc")) {
+ assertThat(actual).isEqualTo(expected);
+ } else {
+ RowType rowType = rowTypeForFullTypesTest();
+ InternalRow.FieldGetter[] fieldGetters = rowType.fieldGetters();
+ for (int i = 0; i < fieldGetters.length; i++) {
+ Object actualField = fieldGetters[i].getFieldOrNull(actual);
+ Object expectedField =
fieldGetters[i].getFieldOrNull(expected);
+ System.out.println(i);
+ if (i == 3) {
+ validateInternalMap((InternalMap) actualField,
(InternalMap) expectedField);
+ } else if (i == 4) {
+ validateInternalArray(
+ (InternalArray) actualField,
+ (InternalArray) expectedField,
+ DataTypes.STRING());
+ } else if (i == 5) {
+ validateInternalArray(
+ (InternalArray) actualField,
+ (InternalArray) expectedField,
+ DataTypes.INT());
+ } else {
+ assertThat(actualField).isEqualTo(expectedField);
+ }
+ }
+ }
+ }
+
+ private void validateInternalMap(InternalMap actualMap, InternalMap
expectedMap) {
+ validateInternalArray(actualMap.keyArray(), expectedMap.keyArray(),
DataTypes.STRING());
+ validateInternalArray(actualMap.valueArray(),
expectedMap.valueArray(), DataTypes.DOUBLE());
+ }
+
+ private void validateInternalArray(
+ InternalArray actualArray, InternalArray expectedArray, DataType
elementType) {
+ switch (elementType.getTypeRoot()) {
+ case VARCHAR:
+ for (int i = 0; i < actualArray.size(); i++) {
+
assertThat(actualArray.getString(i)).isEqualTo(expectedArray.getString(i));
+ }
+ break;
+ case DOUBLE:
+
assertThat(actualArray.toDoubleArray()).isEqualTo(expectedArray.toDoubleArray());
+ break;
+ case INTEGER:
+
assertThat(actualArray.toIntArray()).isEqualTo(expectedArray.toIntArray());
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Haven't implemented array comparing for type "
+ + elementType.getTypeRoot());
+ }
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ArrayColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ArrayColumnReader.java
index 5088c1cc1..ed33d9cc4 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ArrayColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ArrayColumnReader.java
@@ -32,6 +32,8 @@ import
org.apache.paimon.data.columnar.heap.HeapTimestampVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.TimestampType;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReader;
@@ -42,6 +44,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.paimon.types.DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
+
/** Array {@link ColumnReader}. TODO Currently ARRAY type only support non
nested case. */
public class ArrayColumnReader extends BaseVectorizedColumnReader {
@@ -169,7 +173,21 @@ public class ArrayColumnReader extends
BaseVectorizedColumnReader {
}
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- return dataColumn.readTimestamp();
+ int precision;
+ if (type.getTypeRoot() == TIMESTAMP_WITHOUT_TIME_ZONE) {
+ precision = ((TimestampType) type).getPrecision();
+ } else {
+ precision = ((LocalZonedTimestampType)
type).getPrecision();
+ }
+
+ if (precision <= 3) {
+ return dataColumn.readMillsTimestamp();
+ } else if (precision <= 6) {
+ return dataColumn.readMicrosTimestamp();
+ } else {
+ throw new RuntimeException(
+ "Unsupported precision of time type in the list: "
+ precision);
+ }
default:
throw new RuntimeException("Unsupported type in the list: " +
type);
}
@@ -406,17 +424,18 @@ public class ArrayColumnReader extends
BaseVectorizedColumnReader {
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
if (descriptor.getPrimitiveType().getPrimitiveTypeName()
== PrimitiveType.PrimitiveTypeName.INT64) {
- HeapLongVector heapLongVector = new HeapLongVector(total);
- heapLongVector.reset();
- lcv.setChild(new ParquetTimestampVector(heapLongVector));
+ HeapTimestampVector heapTimestampVector = new
HeapTimestampVector(total);
+ heapTimestampVector.reset();
+ lcv.setChild(new
ParquetTimestampVector(heapTimestampVector));
for (int i = 0; i < valueList.size(); i++) {
if (valueList.get(i) == null) {
- ((HeapLongVector) ((ParquetTimestampVector)
lcv.getChild()).getVector())
+ ((HeapTimestampVector)
+ ((ParquetTimestampVector)
lcv.getChild()).getVector())
.setNullAt(i);
} else {
- ((HeapLongVector) ((ParquetTimestampVector)
lcv.getChild()).getVector())
- .vector[i] =
- ((List<Long>) valueList).get(i);
+ ((HeapTimestampVector)
+ ((ParquetTimestampVector)
lcv.getChild()).getVector())
+ .fill(((List<Timestamp>)
valueList).get(i));
}
}
break;
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReader.java
index f6dfed253..1ae3fc656 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReader.java
@@ -65,8 +65,11 @@ public interface ParquetDataColumnReader {
/** @return the next Bytes from the page */
byte[] readBytes();
- /** @return the next TimestampData from the page */
- Timestamp readTimestamp();
+ /** @return the next Timestamp with millisecond precision. */
+ Timestamp readMillsTimestamp();
+
+ /** @return the next Timestamp with microsecond precision. */
+ Timestamp readMicrosTimestamp();
/** @return the underlying dictionary if current reader is dictionary
encoded */
Dictionary getDictionary();
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReaderFactory.java
index ebcb6da3a..271724393 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReaderFactory.java
@@ -106,8 +106,13 @@ public final class ParquetDataColumnReaderFactory {
}
@Override
- public Timestamp readTimestamp() {
- throw new RuntimeException("Unsupported operation");
+ public Timestamp readMillsTimestamp() {
+ return Timestamp.fromEpochMillis(valuesReader.readLong());
+ }
+
+ @Override
+ public Timestamp readMicrosTimestamp() {
+ return Timestamp.fromMicros(valuesReader.readLong());
}
@Override
@@ -199,7 +204,12 @@ public final class ParquetDataColumnReaderFactory {
}
@Override
- public Timestamp readTimestamp() {
+ public Timestamp readMillsTimestamp() {
+ return convert(valuesReader.readBytes());
+ }
+
+ @Override
+ public Timestamp readMicrosTimestamp() {
return convert(valuesReader.readBytes());
}
}
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
index 6202b7a7a..294a29395 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
@@ -25,6 +25,10 @@ import org.apache.paimon.options.Options;
/** An avro {@link FormatReadWriteTest}. */
public class AvroFormatReadWriteTest extends FormatReadWriteTest {
+ protected AvroFormatReadWriteTest() {
+ super("avro");
+ }
+
@Override
protected FileFormat fileFormat() {
return new AvroFileFormat(new Options());
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java
similarity index 79%
copy from
paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
copy to
paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java
index 6202b7a7a..3d3d03fcc 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/orc/OrcFormatReadWriteTest.java
@@ -16,14 +16,19 @@
* limitations under the License.
*/
-package org.apache.paimon.format.avro;
+package org.apache.paimon.format.orc;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReadWriteTest;
+import org.apache.paimon.format.avro.AvroFileFormat;
import org.apache.paimon.options.Options;
-/** An avro {@link FormatReadWriteTest}. */
-public class AvroFormatReadWriteTest extends FormatReadWriteTest {
+/** An orc {@link FormatReadWriteTest}. */
+public class OrcFormatReadWriteTest extends FormatReadWriteTest {
+
+ protected OrcFormatReadWriteTest() {
+ super("orc");
+ }
@Override
protected FileFormat fileFormat() {
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
similarity index 72%
copy from
paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
copy to
paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
index 6202b7a7a..4cfcffe22 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/avro/AvroFormatReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java
@@ -16,17 +16,22 @@
* limitations under the License.
*/
-package org.apache.paimon.format.avro;
+package org.apache.paimon.format.parquet;
import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.format.FileFormatFactory;
import org.apache.paimon.format.FormatReadWriteTest;
import org.apache.paimon.options.Options;
-/** An avro {@link FormatReadWriteTest}. */
-public class AvroFormatReadWriteTest extends FormatReadWriteTest {
+/** A parquet {@link FormatReadWriteTest}. */
+public class ParquetFormatReadWriteTest extends FormatReadWriteTest {
+
+ protected ParquetFormatReadWriteTest() {
+ super("parquet");
+ }
@Override
protected FileFormat fileFormat() {
- return new AvroFileFormat(new Options());
+ return new ParquetFileFormat(new FileFormatFactory.FormatContext(new
Options(), 1024));
}
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
index 5e3b69586..462d85a0d 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/PaimonStorageHandlerITCase.java
@@ -990,13 +990,11 @@ public class PaimonStorageHandlerITCase {
@Test
public void testMapKey() throws Exception {
Options conf = getBasicConf();
- // TODO fix PARQUET
- // conf.set(
- // CoreOptions.FILE_FORMAT,
- // ThreadLocalRandom.current().nextBoolean()
- // ? CoreOptions.FileFormatType.ORC
- // : CoreOptions.FileFormatType.PARQUET);
- conf.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.ORC);
+ conf.set(
+ CoreOptions.FILE_FORMAT,
+ ThreadLocalRandom.current().nextBoolean()
+ ? CoreOptions.FileFormatType.ORC
+ : CoreOptions.FileFormatType.PARQUET);
Table table =
FileStoreTestUtils.createFileStoreTable(
conf,
@@ -1004,13 +1002,15 @@ public class PaimonStorageHandlerITCase {
new DataType[] {
DataTypes.MAP(DataTypes.DATE(),
DataTypes.STRING()),
DataTypes.MAP(DataTypes.TIMESTAMP(3),
DataTypes.STRING()),
+ DataTypes.MAP(DataTypes.TIMESTAMP(5),
DataTypes.STRING()),
DataTypes.MAP(DataTypes.DECIMAL(2, 1),
DataTypes.STRING()),
DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()),
DataTypes.MAP(DataTypes.VARCHAR(10),
DataTypes.STRING())
},
new String[] {
"date_key",
- "timestamp_key",
+ "timestamp3_key",
+ "timestamp5_key",
"decimal_key",
"string_key",
"varchar_key"
@@ -1024,11 +1024,16 @@ public class PaimonStorageHandlerITCase {
Map<Integer, BinaryString> dateMap =
Collections.singletonMap(375, BinaryString.fromString("Date
1971-01-11"));
- Map<Timestamp, BinaryString> timestampMap =
+ Map<Timestamp, BinaryString> timestamp3Map =
Collections.singletonMap(
Timestamp.fromLocalDateTime(
LocalDateTime.of(2023, 7, 18, 12, 29, 59,
123_000_000)),
BinaryString.fromString("Test timestamp(3)"));
+ Map<Timestamp, BinaryString> timestamp5Map =
+ Collections.singletonMap(
+ Timestamp.fromLocalDateTime(
+ LocalDateTime.of(2023, 7, 18, 12, 29, 59,
123_450_000)),
+ BinaryString.fromString("Test timestamp(5)"));
Map<Decimal, BinaryString> decimalMap =
Collections.singletonMap(
Decimal.fromBigDecimal(new BigDecimal("1.2"), 2, 1),
@@ -1043,7 +1048,8 @@ public class PaimonStorageHandlerITCase {
write.write(
GenericRow.of(
new GenericMap(dateMap),
- new GenericMap(timestampMap),
+ new GenericMap(timestamp3Map),
+ new GenericMap(timestamp5Map),
new GenericMap(decimalMap),
new GenericMap(stringMap),
new GenericMap(varcharMap)));
@@ -1053,11 +1059,13 @@ public class PaimonStorageHandlerITCase {
createExternalTable();
Assert.assertEquals(
- Collections.singletonList("Date 1971-01-11\tTest
timestamp(3)\t一点二\tHive\tPaimon"),
+ Collections.singletonList(
+ "Date 1971-01-11\tTest timestamp(3)\tTest
timestamp(5)\t一点二\tHive\tPaimon"),
hiveShell.executeQuery(
"SELECT "
+ "date_key[CAST('1971-01-11' AS DATE)],"
- + "timestamp_key[CAST('2023-7-18 12:29:59.123'
AS TIMESTAMP)],"
+ + "timestamp3_key[CAST('2023-7-18
12:29:59.123' AS TIMESTAMP)],"
+ + "timestamp5_key[CAST('2023-7-18
12:29:59.12345' AS TIMESTAMP)],"
+ "decimal_key[1.2],"
+ "string_key['Engine'],"
+ "varchar_key['Name']"