This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f90cc902 [core] fix after compactioned the array elements are 
duplicated (#2364)
5f90cc902 is described below

commit 5f90cc902b788ee0d35d1c1b2dcb496c1a41fed5
Author: Kerwin <[email protected]>
AuthorDate: Wed Nov 22 09:44:49 2023 +0800

    [core] fix after compactioned the array elements are duplicated (#2364)
---
 .../flink/FullCompactionFileStoreITCase.java       | 27 ++++++++++++++++++++--
 .../format/orc/reader/OrcRowColumnVector.java      |  7 +++---
 2 files changed, 28 insertions(+), 6 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
index ec3e68ce1..e23dff160 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
@@ -28,19 +28,20 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** SQL ITCase for continuous file store. */
 public class FullCompactionFileStoreITCase extends CatalogITCaseBase {
     private final String table = "T";
+    private final String options =
+            " WITH('changelog-producer'='full-compaction', 
'changelog-producer.compaction-interval' = '1s')";
 
     @Override
     @BeforeEach
     public void before() throws IOException {
         super.before();
-        String options =
-                " WITH('changelog-producer'='full-compaction', 
'changelog-producer.compaction-interval' = '1s')";
         tEnv.executeSql(
                 "CREATE TABLE IF NOT EXISTS T (a STRING, b STRING, c STRING, 
PRIMARY KEY (a) NOT ENFORCED)"
                         + options);
@@ -59,6 +60,28 @@ public class FullCompactionFileStoreITCase extends 
CatalogITCaseBase {
         assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7", 
"8", "9"));
     }
 
+    /** Test streaming read with array and row nested data type. */
+    @Test
+    public void testStreamingReadOfArray() throws Exception {
+        String table = "T_ARRAY";
+        tEnv.executeSql(
+                "CREATE TABLE IF NOT EXISTS "
+                        + table
+                        + "("
+                        + "ID INT PRIMARY KEY NOT ENFORCED,\n"
+                        + "NAMES ARRAY<ROW<NAME STRING, MARK STRING>>\n"
+                        + ")"
+                        + options);
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
+
+        sql(
+                "INSERT INTO %s VALUES (1, ARRAY[('c','mark1'), ('d','mark2'), 
('e','mark3')]);",
+                table);
+        
assertThat(iterator.collect(1).stream().map(Row::toString).collect(Collectors.toList()))
+                .containsExactlyInAnyOrder("+I[1, [+I[c, mark1], +I[d, mark2], 
+I[e, mark3]]]");
+    }
+
     @Test
     public void testCompactedScanMode() throws Exception {
         BlockingIterator<Row, Row> iterator =
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java
index 698655e8a..f80729707 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java
@@ -29,7 +29,7 @@ import 
org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
 public class OrcRowColumnVector extends AbstractOrcColumnVector
         implements org.apache.paimon.data.columnar.RowColumnVector {
 
-    private final ColumnarRow columnarRow;
+    private final VectorizedColumnBatch batch;
 
     public OrcRowColumnVector(StructColumnVector hiveVector, RowType type) {
         super(hiveVector);
@@ -38,12 +38,11 @@ public class OrcRowColumnVector extends 
AbstractOrcColumnVector
         for (int i = 0; i < len; i++) {
             paimonVectors[i] = createPaimonVector(hiveVector.fields[i], 
type.getTypeAt(i));
         }
-        this.columnarRow = new ColumnarRow(new 
VectorizedColumnBatch(paimonVectors));
+        this.batch = new VectorizedColumnBatch(paimonVectors);
     }
 
     @Override
     public ColumnarRow getRow(int i) {
-        this.columnarRow.setRowId(i);
-        return this.columnarRow;
+        return new ColumnarRow(batch, i);
     }
 }

Reply via email to