This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5f90cc902 [core] fix after compactioned the array elements are
duplicated (#2364)
5f90cc902 is described below
commit 5f90cc902b788ee0d35d1c1b2dcb496c1a41fed5
Author: Kerwin <[email protected]>
AuthorDate: Wed Nov 22 09:44:49 2023 +0800
[core] fix after compactioned the array elements are duplicated (#2364)
---
.../flink/FullCompactionFileStoreITCase.java | 27 ++++++++++++++++++++--
.../format/orc/reader/OrcRowColumnVector.java | 7 +++---
2 files changed, 28 insertions(+), 6 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
index ec3e68ce1..e23dff160 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FullCompactionFileStoreITCase.java
@@ -28,19 +28,20 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
/** SQL ITCase for continuous file store. */
public class FullCompactionFileStoreITCase extends CatalogITCaseBase {
private final String table = "T";
+ private final String options =
+ " WITH('changelog-producer'='full-compaction',
'changelog-producer.compaction-interval' = '1s')";
@Override
@BeforeEach
public void before() throws IOException {
super.before();
- String options =
- " WITH('changelog-producer'='full-compaction',
'changelog-producer.compaction-interval' = '1s')";
tEnv.executeSql(
"CREATE TABLE IF NOT EXISTS T (a STRING, b STRING, c STRING,
PRIMARY KEY (a) NOT ENFORCED)"
+ options);
@@ -59,6 +60,28 @@ public class FullCompactionFileStoreITCase extends
CatalogITCaseBase {
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("7",
"8", "9"));
}
+ /** Test streaming read with array and row nested data type. */
+ @Test
+ public void testStreamingReadOfArray() throws Exception {
+ String table = "T_ARRAY";
+ tEnv.executeSql(
+ "CREATE TABLE IF NOT EXISTS "
+ + table
+ + "("
+ + "ID INT PRIMARY KEY NOT ENFORCED,\n"
+ + "NAMES ARRAY<ROW<NAME STRING, MARK STRING>>\n"
+ + ")"
+ + options);
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(streamSqlIter("SELECT * FROM %s", table));
+
+ sql(
+ "INSERT INTO %s VALUES (1, ARRAY[('c','mark1'), ('d','mark2'),
('e','mark3')]);",
+ table);
+
assertThat(iterator.collect(1).stream().map(Row::toString).collect(Collectors.toList()))
+ .containsExactlyInAnyOrder("+I[1, [+I[c, mark1], +I[d, mark2],
+I[e, mark3]]]");
+ }
+
@Test
public void testCompactedScanMode() throws Exception {
BlockingIterator<Row, Row> iterator =
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java
index 698655e8a..f80729707 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/reader/OrcRowColumnVector.java
@@ -29,7 +29,7 @@ import
org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
public class OrcRowColumnVector extends AbstractOrcColumnVector
implements org.apache.paimon.data.columnar.RowColumnVector {
- private final ColumnarRow columnarRow;
+ private final VectorizedColumnBatch batch;
public OrcRowColumnVector(StructColumnVector hiveVector, RowType type) {
super(hiveVector);
@@ -38,12 +38,11 @@ public class OrcRowColumnVector extends
AbstractOrcColumnVector
for (int i = 0; i < len; i++) {
paimonVectors[i] = createPaimonVector(hiveVector.fields[i],
type.getTypeAt(i));
}
- this.columnarRow = new ColumnarRow(new
VectorizedColumnBatch(paimonVectors));
+ this.batch = new VectorizedColumnBatch(paimonVectors);
}
@Override
public ColumnarRow getRow(int i) {
- this.columnarRow.setRowId(i);
- return this.columnarRow;
+ return new ColumnarRow(batch, i);
}
}