This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 10540c75c [Bug] Fix the wrong record count for partitions table (#2060)
10540c75c is described below
commit 10540c75c99494df2dacc9589e6ed04877305ebc
Author: Aitozi <[email protected]>
AuthorDate: Thu Sep 28 00:34:10 2023 +0800
[Bug] Fix the wrong record count for partitions table (#2060)
---
.../paimon/table/system/PartitionsTable.java | 3 +-
.../org/apache/paimon/table/TableTestBase.java | 20 ++++-
.../paimon/table/system/PartitionsTableTest.java | 96 ++++++++++++++++++++++
3 files changed, 116 insertions(+), 3 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
index d2fe41315..9dacf9d5f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
@@ -274,8 +274,7 @@ public class PartitionsTable implements ReadonlyTable {
// Grouping and summing
Partition rowData =
groupedData.computeIfAbsent(
- partitionId,
- key -> new Partition(partitionId, recordCount,
fileSizeInBytes));
+ partitionId, key -> new Partition(partitionId,
0, 0));
rowData.recordCount += recordCount;
rowData.fileSizeInBytes += fileSizeInBytes;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index 9f305182f..322d0c8ad 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -40,12 +40,15 @@ import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Projection;
import org.apache.paimon.utils.TraceableFileIO;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -116,15 +119,30 @@ public abstract class TableTestBase {
protected List<InternalRow> read(Table table, Pair<ConfigOption<?>,
String>... dynamicOptions)
throws Exception {
+ return read(table, null, dynamicOptions);
+ }
+
+ protected List<InternalRow> read(
+ Table table,
+ @Nullable int[][] projection,
+ Pair<ConfigOption<?>, String>... dynamicOptions)
+ throws Exception {
Map<String, String> options = new HashMap<>();
for (Pair<ConfigOption<?>, String> pair : dynamicOptions) {
options.put(pair.getKey().key(), pair.getValue());
}
table = table.copy(options);
ReadBuilder readBuilder = table.newReadBuilder();
+ if (projection != null) {
+ readBuilder.withProjection(projection);
+ }
RecordReader<InternalRow> reader =
readBuilder.newRead().createReader(readBuilder.newScan().plan());
- InternalRowSerializer serializer = new
InternalRowSerializer(table.rowType());
+ InternalRowSerializer serializer =
+ new InternalRowSerializer(
+ projection == null
+ ? table.rowType()
+ :
Projection.of(projection).project(table.rowType()));
List<InternalRow> rows = new ArrayList<>();
reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
return rows;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
new file mode 100644
index 000000000..0a9a0fe52
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
@@ -0,0 +1,96 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link PartitionsTable}. */
+public class PartitionsTableTest extends TableTestBase {
+
+ private static final String tableName = "MyTable";
+
+ private FileStoreTable table;
+
+ private PartitionsTable partitionsTable;
+
+ @BeforeEach
+ public void before() throws Exception {
+ FileIO fileIO = LocalFileIO.create();
+ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, tableName));
+ Schema schema =
+ Schema.newBuilder()
+ .column("pk", DataTypes.INT())
+ .column("pt", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+ .option(CoreOptions.BUCKET.key(), "2")
+ .build();
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath),
schema);
+ table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath,
tableSchema);
+
+ Identifier filesTableId =
+ identifier(tableName + Catalog.SYSTEM_TABLE_SPLITTER +
PartitionsTable.PARTITIONS);
+ partitionsTable = (PartitionsTable) catalog.getTable(filesTableId);
+
+ // snapshot 1: append
+ write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 5));
+
+ write(table, GenericRow.of(1, 1, 3), GenericRow.of(1, 2, 4));
+ }
+
+ @Test
+ public void testPartitionRecordCount() throws Exception {
+ List<InternalRow> expectedRow = new ArrayList<>();
+ expectedRow.add(GenericRow.of(BinaryString.fromString("[1]"), 2L));
+ expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 2L));
+
+ // Only read partition and record count, record size may not stable.
+ List<InternalRow> result = read(partitionsTable, new int[][] {{0},
{1}});
+ assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
+ }
+}