This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 10540c75c [Bug] Fix the wrong record count for partitions table (#2060)
10540c75c is described below

commit 10540c75c99494df2dacc9589e6ed04877305ebc
Author: Aitozi <[email protected]>
AuthorDate: Thu Sep 28 00:34:10 2023 +0800

    [Bug] Fix the wrong record count for partitions table (#2060)
---
 .../paimon/table/system/PartitionsTable.java       |  3 +-
 .../org/apache/paimon/table/TableTestBase.java     | 20 ++++-
 .../paimon/table/system/PartitionsTableTest.java   | 96 ++++++++++++++++++++++
 3 files changed, 116 insertions(+), 3 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
index d2fe41315..9dacf9d5f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/PartitionsTable.java
@@ -274,8 +274,7 @@ public class PartitionsTable implements ReadonlyTable {
                 // Grouping and summing
                 Partition rowData =
                         groupedData.computeIfAbsent(
-                                partitionId,
-                                key -> new Partition(partitionId, recordCount, 
fileSizeInBytes));
+                                partitionId, key -> new Partition(partitionId, 
0, 0));
                 rowData.recordCount += recordCount;
                 rowData.fileSizeInBytes += fileSizeInBytes;
             }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index 9f305182f..322d0c8ad 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -40,12 +40,15 @@ import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Projection;
 import org.apache.paimon.utils.TraceableFileIO;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.io.TempDir;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -116,15 +119,30 @@ public abstract class TableTestBase {
 
     protected List<InternalRow> read(Table table, Pair<ConfigOption<?>, 
String>... dynamicOptions)
             throws Exception {
+        return read(table, null, dynamicOptions);
+    }
+
+    protected List<InternalRow> read(
+            Table table,
+            @Nullable int[][] projection,
+            Pair<ConfigOption<?>, String>... dynamicOptions)
+            throws Exception {
         Map<String, String> options = new HashMap<>();
         for (Pair<ConfigOption<?>, String> pair : dynamicOptions) {
             options.put(pair.getKey().key(), pair.getValue());
         }
         table = table.copy(options);
         ReadBuilder readBuilder = table.newReadBuilder();
+        if (projection != null) {
+            readBuilder.withProjection(projection);
+        }
         RecordReader<InternalRow> reader =
                 
readBuilder.newRead().createReader(readBuilder.newScan().plan());
-        InternalRowSerializer serializer = new 
InternalRowSerializer(table.rowType());
+        InternalRowSerializer serializer =
+                new InternalRowSerializer(
+                        projection == null
+                                ? table.rowType()
+                                : 
Projection.of(projection).project(table.rowType()));
         List<InternalRow> rows = new ArrayList<>();
         reader.forEachRemaining(row -> rows.add(serializer.copy(row)));
         return rows;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
new file mode 100644
index 000000000..0a9a0fe52
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/PartitionsTableTest.java
@@ -0,0 +1,96 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link PartitionsTable}. */
+public class PartitionsTableTest extends TableTestBase {
+
+    private static final String tableName = "MyTable";
+
+    private FileStoreTable table;
+
+    private PartitionsTable partitionsTable;
+
+    @BeforeEach
+    public void before() throws Exception {
+        FileIO fileIO = LocalFileIO.create();
+        Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, tableName));
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pk", DataTypes.INT())
+                        .column("pt", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+                        .option(CoreOptions.BUCKET.key(), "2")
+                        .build();
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(new SchemaManager(fileIO, tablePath), 
schema);
+        table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath, 
tableSchema);
+
+        Identifier filesTableId =
+                identifier(tableName + Catalog.SYSTEM_TABLE_SPLITTER + 
PartitionsTable.PARTITIONS);
+        partitionsTable = (PartitionsTable) catalog.getTable(filesTableId);
+
+        // snapshot 1: append
+        write(table, GenericRow.of(1, 1, 1), GenericRow.of(1, 2, 5));
+
+        write(table, GenericRow.of(1, 1, 3), GenericRow.of(1, 2, 4));
+    }
+
+    @Test
+    public void testPartitionRecordCount() throws Exception {
+        List<InternalRow> expectedRow = new ArrayList<>();
+        expectedRow.add(GenericRow.of(BinaryString.fromString("[1]"), 2L));
+        expectedRow.add(GenericRow.of(BinaryString.fromString("[2]"), 2L));
+
+        // Only read partition and record count, record size may not stable.
+        List<InternalRow> result = read(partitionsTable, new int[][] {{0}, 
{1}});
+        assertThat(result).containsExactlyInAnyOrderElementsOf(expectedRow);
+    }
+}

Reply via email to