This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bf8f5598d [parquet] Fix that cannot read parquet ROW<DECIMAL> data 
(#4533)
bf8f5598d is described below

commit bf8f5598d9a48f907f2346df39507c27877a5952
Author: yuzelin <[email protected]>
AuthorDate: Thu Nov 14 20:28:30 2024 +0800

    [parquet] Fix that cannot read parquet ROW<DECIMAL> data (#4533)
---
 .../data/columnar/heap/AbstractHeapVector.java     |  4 +-
 .../data/columnar/heap/ElementCountable.java       | 23 ++-------
 .../apache/paimon/flink/BatchFileStoreITCase.java  | 19 +++++++
 .../format/parquet/ParquetReaderFactory.java       |  6 ++-
 .../format/parquet/reader/NestedColumnReader.java  |  3 +-
 .../reader/NestedPrimitiveColumnReader.java        |  6 +--
 .../parquet/reader/ParquetDecimalVector.java       | 16 +++++-
 .../format/parquet/reader/RowColumnReader.java     | 59 ----------------------
 8 files changed, 50 insertions(+), 86 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java
index 702877642..f0e82eac4 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/AbstractHeapVector.java
@@ -25,7 +25,8 @@ import java.nio.ByteOrder;
 import java.util.Arrays;
 
 /** Heap vector that nullable shared structure. */
-public abstract class AbstractHeapVector extends AbstractWritableVector {
+public abstract class AbstractHeapVector extends AbstractWritableVector
+        implements ElementCountable {
 
     public static final boolean LITTLE_ENDIAN = ByteOrder.nativeOrder() == 
ByteOrder.LITTLE_ENDIAN;
 
@@ -116,6 +117,7 @@ public abstract class AbstractHeapVector extends 
AbstractWritableVector {
         return dictionaryIds;
     }
 
+    @Override
     public int getLen() {
         return this.len;
     }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/ElementCountable.java
similarity index 60%
rename from 
paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
rename to 
paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/ElementCountable.java
index fb6378349..a32762d65 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/ElementCountable.java
@@ -16,25 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.format.parquet.position;
+package org.apache.paimon.data.columnar.heap;
 
-import javax.annotation.Nullable;
+/** Container with a known number of elements. */
+public interface ElementCountable {
 
-/** To represent struct's position in repeated type. */
-public class RowPosition {
-    @Nullable private final boolean[] isNull;
-    private final int positionsCount;
-
-    public RowPosition(boolean[] isNull, int positionsCount) {
-        this.isNull = isNull;
-        this.positionsCount = positionsCount;
-    }
-
-    public boolean[] getIsNull() {
-        return isNull;
-    }
-
-    public int getPositionsCount() {
-        return positionsCount;
-    }
+    int getLen();
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index c30e6cd56..cdc114b04 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -34,6 +34,7 @@ import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.math.BigDecimal;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -573,6 +574,24 @@ public class BatchFileStoreITCase extends 
CatalogITCaseBase {
         validateCount1NotPushDown(sql);
     }
 
+    @Test
+    public void testParquetRowDecimalAndTimestamp() {
+        sql(
+                "CREATE TABLE parquet_row_decimal(`row` ROW<f0 DECIMAL(2,1)>) 
WITH ('file.format' = 'parquet')");
+        sql("INSERT INTO parquet_row_decimal VALUES ( (ROW(1.2)) )");
+
+        assertThat(sql("SELECT * FROM parquet_row_decimal"))
+                .containsExactly(Row.of(Row.of(new BigDecimal("1.2"))));
+
+        sql(
+                "CREATE TABLE parquet_row_timestamp(`row` ROW<f0 
TIMESTAMP(0)>) WITH ('file.format' = 'parquet')");
+        sql("INSERT INTO parquet_row_timestamp VALUES ( 
(ROW(TIMESTAMP'2024-11-13 18:00:00')) )");
+
+        assertThat(sql("SELECT * FROM parquet_row_timestamp"))
+                .containsExactly(
+                        
Row.of(Row.of(DateTimeUtils.toLocalDateTime("2024-11-13 18:00:00", 0))));
+    }
+
     private void validateCount1PushDown(String sql) {
         Transformation<?> transformation = AbstractTestBase.translate(tEnv, 
sql);
         while (!transformation.getInputs().isEmpty()) {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 53b4b1634..f0151d6f3 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.columnar.ColumnVector;
 import org.apache.paimon.data.columnar.ColumnarRow;
 import org.apache.paimon.data.columnar.ColumnarRowIterator;
 import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+import org.apache.paimon.data.columnar.heap.ElementCountable;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.parquet.reader.ColumnReader;
@@ -293,7 +294,10 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
         for (int i = 0; i < writableVectors.length; i++) {
             switch (projectedFields[i].type().getTypeRoot()) {
                 case DECIMAL:
-                    vectors[i] = new ParquetDecimalVector(writableVectors[i]);
+                    vectors[i] =
+                            new ParquetDecimalVector(
+                                    writableVectors[i],
+                                    ((ElementCountable) 
writableVectors[i]).getLen());
                     break;
                 case TIMESTAMP_WITHOUT_TIME_ZONE:
                 case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
index c89c77603..68225fbd1 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
@@ -20,6 +20,7 @@ package org.apache.paimon.format.parquet.reader;
 
 import org.apache.paimon.data.columnar.ColumnVector;
 import org.apache.paimon.data.columnar.heap.AbstractHeapVector;
+import org.apache.paimon.data.columnar.heap.ElementCountable;
 import org.apache.paimon.data.columnar.heap.HeapArrayVector;
 import org.apache.paimon.data.columnar.heap.HeapMapVector;
 import org.apache.paimon.data.columnar.heap.HeapRowVector;
@@ -134,7 +135,7 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
                     String.format("Row field does not have any children: %s.", 
field));
         }
 
-        int len = ((AbstractHeapVector) finalChildrenVectors[0]).getLen();
+        int len = ((ElementCountable) finalChildrenVectors[0]).getLen();
         boolean[] isNull = new boolean[len];
         Arrays.fill(isNull, true);
         boolean hasNull = false;
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
index 7ee33a0bb..7d00ff792 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
@@ -495,7 +495,7 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
                                 phiv.vector[i] = ((List<Integer>) 
valueList).get(i);
                             }
                         }
-                        return new ParquetDecimalVector(phiv);
+                        return new ParquetDecimalVector(phiv, total);
                     case INT64:
                         HeapLongVector phlv = new HeapLongVector(total);
                         for (int i = 0; i < valueList.size(); i++) {
@@ -505,10 +505,10 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
                                 phlv.vector[i] = ((List<Long>) 
valueList).get(i);
                             }
                         }
-                        return new ParquetDecimalVector(phlv);
+                        return new ParquetDecimalVector(phlv, total);
                     default:
                         HeapBytesVector phbv = getHeapBytesVector(total, 
valueList);
-                        return new ParquetDecimalVector(phbv);
+                        return new ParquetDecimalVector(phbv, total);
                 }
             default:
                 throw new RuntimeException("Unsupported type in the list: " + 
type);
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
index 28d308bac..42714ab06 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
@@ -25,6 +25,7 @@ import org.apache.paimon.data.columnar.DecimalColumnVector;
 import org.apache.paimon.data.columnar.Dictionary;
 import org.apache.paimon.data.columnar.IntColumnVector;
 import org.apache.paimon.data.columnar.LongColumnVector;
+import org.apache.paimon.data.columnar.heap.ElementCountable;
 import org.apache.paimon.data.columnar.writable.WritableBytesVector;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
 import org.apache.paimon.data.columnar.writable.WritableIntVector;
@@ -38,12 +39,18 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  * {@link DecimalColumnVector} interface.
  */
 public class ParquetDecimalVector
-        implements DecimalColumnVector, WritableLongVector, WritableIntVector, 
WritableBytesVector {
+        implements DecimalColumnVector,
+                WritableLongVector,
+                WritableIntVector,
+                WritableBytesVector,
+                ElementCountable {
 
     private final ColumnVector vector;
+    private final int len;
 
-    public ParquetDecimalVector(ColumnVector vector) {
+    public ParquetDecimalVector(ColumnVector vector, int len) {
         this.vector = vector;
+        this.len = len;
     }
 
     @Override
@@ -225,4 +232,9 @@ public class ParquetDecimalVector
             ((WritableLongVector) vector).fill(value);
         }
     }
+
+    @Override
+    public int getLen() {
+        return len;
+    }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RowColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RowColumnReader.java
deleted file mode 100644
index fa2da03ef..000000000
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RowColumnReader.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.parquet.reader;
-
-import org.apache.paimon.data.columnar.heap.HeapRowVector;
-import org.apache.paimon.data.columnar.writable.WritableColumnVector;
-
-import java.io.IOException;
-import java.util.List;
-
-/** Row {@link ColumnReader}. */
-public class RowColumnReader implements ColumnReader<WritableColumnVector> {
-
-    private final List<ColumnReader> fieldReaders;
-
-    public RowColumnReader(List<ColumnReader> fieldReaders) {
-        this.fieldReaders = fieldReaders;
-    }
-
-    @Override
-    public void readToVector(int readNumber, WritableColumnVector vector) 
throws IOException {
-        HeapRowVector rowVector = (HeapRowVector) vector;
-        WritableColumnVector[] vectors = rowVector.getFields();
-        // row vector null array
-        boolean[] isNulls = new boolean[readNumber];
-        for (int i = 0; i < vectors.length; i++) {
-            fieldReaders.get(i).readToVector(readNumber, vectors[i]);
-
-            for (int j = 0; j < readNumber; j++) {
-                if (i == 0) {
-                    isNulls[j] = vectors[i].isNullAt(j);
-                } else {
-                    isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
-                }
-                if (i == vectors.length - 1 && isNulls[j]) {
-                    // rowColumnVector[j] is null only when all fields[j] of 
rowColumnVector[j] is
-                    // null
-                    rowVector.setNullAt(j);
-                }
-            }
-        }
-    }
-}

Reply via email to