This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c54255abd [rest] Supports global system tables (#4880)
6c54255abd is described below

commit 6c54255abdd7cd3bfa9472d93925251fe63aeb9c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jan 10 19:55:41 2025 +0800

    [rest] Supports global system tables (#4880)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java | 30 +------
 .../org/apache/paimon/catalog/CatalogUtils.java    | 30 +++++++
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  2 +-
 .../paimon/table/system/AllTableOptionsTable.java  | 92 +++++++-------------
 .../org/apache/paimon/catalog/CatalogTestBase.java |  8 --
 .../org/apache/paimon/rest/RESTCatalogTest.java    |  5 --
 .../table/system/AllTableOptionsTableTest.java     |  3 +-
 .../apache/paimon/flink/CatalogTableITCase.java    | 71 +++++++++++++++-
 .../org/apache/paimon/flink/SystemTableITCase.java | 98 ----------------------
 9 files changed, 132 insertions(+), 207 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index a4c47f54a6..702d5229cf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -39,8 +39,6 @@ import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.object.ObjectTable;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
-import org.apache.paimon.table.system.AllTableOptionsTable;
-import org.apache.paimon.table.system.CatalogOptionsTable;
 import org.apache.paimon.table.system.SystemTableLoader;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
@@ -66,8 +64,6 @@ import static 
org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSyste
 import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
 import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
 import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
-import static 
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
-import static 
org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -372,15 +368,7 @@ public abstract class AbstractCatalog implements Catalog {
     @Override
     public Table getTable(Identifier identifier) throws TableNotExistException 
{
         if (isSystemDatabase(identifier.getDatabaseName())) {
-            String tableName = identifier.getTableName();
-            switch (tableName.toLowerCase()) {
-                case ALL_TABLE_OPTIONS:
-                    return new AllTableOptionsTable(fileIO, allTablePaths());
-                case CATALOG_OPTIONS:
-                    return new CatalogOptionsTable(catalogOptions);
-                default:
-                    throw new TableNotExistException(identifier);
-            }
+            return 
CatalogUtils.createGlobalSystemTable(identifier.getTableName(), this);
         } else if (identifier.isSystemTable()) {
             Table originTable =
                     getDataOrFormatTable(
@@ -454,22 +442,6 @@ public abstract class AbstractCatalog implements Catalog {
         return newDatabasePath(warehouse(), database);
     }
 
-    public Map<String, Map<String, Path>> allTablePaths() {
-        try {
-            Map<String, Map<String, Path>> allPaths = new HashMap<>();
-            for (String database : listDatabases()) {
-                Map<String, Path> tableMap =
-                        allPaths.computeIfAbsent(database, d -> new 
HashMap<>());
-                for (String table : listTables(database)) {
-                    tableMap.put(table, 
getTableLocation(Identifier.create(database, table)));
-                }
-            }
-            return allPaths;
-        } catch (DatabaseNotExistException e) {
-            throw new RuntimeException("Database is deleted while listing", e);
-        }
-    }
-
     protected TableMeta getDataTableMeta(Identifier identifier) throws 
TableNotExistException {
         return new TableMeta(getDataTableSchema(identifier), null);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 9267532f9d..9b69248d6d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -26,11 +26,14 @@ import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.system.AllTableOptionsTable;
+import org.apache.paimon.table.system.CatalogOptionsTable;
 import org.apache.paimon.table.system.SystemTableLoader;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Preconditions;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -39,6 +42,8 @@ import static 
org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGCY_NAME;
 import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
 import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static 
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
+import static 
org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Utils for {@link Catalog}. */
@@ -121,6 +126,31 @@ public class CatalogUtils {
                         CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
     }
 
+    public static Table createGlobalSystemTable(String tableName, Catalog 
catalog)
+            throws Catalog.TableNotExistException {
+        switch (tableName.toLowerCase()) {
+            case ALL_TABLE_OPTIONS:
+                try {
+                    Map<Identifier, Map<String, String>> allOptions = new 
HashMap<>();
+                    for (String database : catalog.listDatabases()) {
+                        for (String name : catalog.listTables(database)) {
+                            Identifier identifier = 
Identifier.create(database, name);
+                            Table table = catalog.getTable(identifier);
+                            allOptions.put(identifier, table.options());
+                        }
+                    }
+                    return new AllTableOptionsTable(allOptions);
+                } catch (Catalog.DatabaseNotExistException | 
Catalog.TableNotExistException e) {
+                    throw new RuntimeException("Database is deleted while 
listing", e);
+                }
+            case CATALOG_OPTIONS:
+                return new 
CatalogOptionsTable(Options.fromMap(catalog.options()));
+            default:
+                throw new Catalog.TableNotExistException(
+                        Identifier.create(SYSTEM_DATABASE_NAME, tableName));
+        }
+    }
+
     public static Table createSystemTable(Identifier identifier, Table 
originTable)
             throws Catalog.TableNotExistException {
         if (!(originTable instanceof FileStoreTable)) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 3f7647ca84..a807ad2c9d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -273,7 +273,7 @@ public class RESTCatalog implements Catalog {
     @Override
     public Table getTable(Identifier identifier) throws TableNotExistException 
{
         if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
-            throw new UnsupportedOperationException("TODO support global 
system tables.");
+            return 
CatalogUtils.createGlobalSystemTable(identifier.getTableName(), this);
         } else if (identifier.isSystemTable()) {
             return getSystemTable(identifier);
         } else {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
index 13b5366a6a..b354a263c7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
@@ -18,15 +18,13 @@
 
 package org.apache.paimon.table.system;
 
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.ReadonlyTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.InnerTableRead;
@@ -45,7 +43,6 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -68,13 +65,10 @@ public class AllTableOptionsTable implements ReadonlyTable {
 
     public static final String ALL_TABLE_OPTIONS = "all_table_options";
 
-    private final FileIO fileIO;
-    private final Map<String, Map<String, Path>> allTablePaths;
+    private final Map<Identifier, Map<String, String>> allOptions;
 
-    public AllTableOptionsTable(FileIO fileIO, Map<String, Map<String, Path>> 
allTablePaths) {
-        // allTablePath is the map of  <database, <table_name, properties>>
-        this.fileIO = fileIO;
-        this.allTablePaths = allTablePaths;
+    public AllTableOptionsTable(Map<Identifier, Map<String, String>> 
allOptions) {
+        this.allOptions = allOptions;
     }
 
     @Override
@@ -104,12 +98,12 @@ public class AllTableOptionsTable implements ReadonlyTable 
{
 
     @Override
     public InnerTableRead newRead() {
-        return new AllTableOptionsRead(fileIO);
+        return new AllTableOptionsRead();
     }
 
     @Override
     public Table copy(Map<String, String> dynamicOptions) {
-        return new AllTableOptionsTable(fileIO, allTablePaths);
+        return new AllTableOptionsTable(allOptions);
     }
 
     private class AllTableOptionsScan extends ReadOnceTableScan {
@@ -121,7 +115,7 @@ public class AllTableOptionsTable implements ReadonlyTable {
 
         @Override
         public Plan innerPlan() {
-            return () -> Collections.singletonList(new 
AllTableSplit(allTablePaths));
+            return () -> Collections.singletonList(new 
AllTableSplit(allOptions));
         }
     }
 
@@ -129,10 +123,10 @@ public class AllTableOptionsTable implements 
ReadonlyTable {
 
         private static final long serialVersionUID = 1L;
 
-        private final Map<String, Map<String, Path>> allTablePaths;
+        private final Map<Identifier, Map<String, String>> allOptions;
 
-        private AllTableSplit(Map<String, Map<String, Path>> allTablePaths) {
-            this.allTablePaths = allTablePaths;
+        private AllTableSplit(Map<Identifier, Map<String, String>> allOptions) 
{
+            this.allOptions = allOptions;
         }
 
         @Override
@@ -144,24 +138,19 @@ public class AllTableOptionsTable implements 
ReadonlyTable {
                 return false;
             }
             AllTableSplit that = (AllTableSplit) o;
-            return Objects.equals(allTablePaths, that.allTablePaths);
+            return Objects.equals(allOptions, that.allOptions);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(allTablePaths);
+            return Objects.hash(allOptions);
         }
     }
 
     private static class AllTableOptionsRead implements InnerTableRead {
 
-        private final FileIO fileIO;
         private RowType readType;
 
-        public AllTableOptionsRead(FileIO fileIO) {
-            this.fileIO = fileIO;
-        }
-
         @Override
         public InnerTableRead withFilter(Predicate predicate) {
             return this;
@@ -183,29 +172,12 @@ public class AllTableOptionsTable implements 
ReadonlyTable {
             if (!(split instanceof AllTableSplit)) {
                 throw new IllegalArgumentException("Unsupported split: " + 
split.getClass());
             }
-            Map<String, Map<String, Path>> location = ((AllTableSplit) 
split).allTablePaths;
-            Iterator<InternalRow> rows = toRow(options(fileIO, location));
-            if (readType != null) {
-                rows =
-                        Iterators.transform(
-                                rows,
-                                row ->
-                                        ProjectedRow.from(
-                                                        readType, 
AggregationFieldsTable.TABLE_TYPE)
-                                                .replaceRow(row));
-            }
-            return new IteratorRecordReader<>(rows);
-        }
-    }
-
-    protected static Iterator<InternalRow> toRow(
-            Map<String, Map<String, Map<String, String>>> option) {
-        List<InternalRow> rows = new ArrayList<>();
-        for (Map.Entry<String, Map<String, Map<String, String>>> entry0 : 
option.entrySet()) {
-            String database = entry0.getKey();
-            for (Map.Entry<String, Map<String, String>> entry1 : 
entry0.getValue().entrySet()) {
-                String tableName = entry1.getKey();
-                for (Map.Entry<String, String> entry2 : 
entry1.getValue().entrySet()) {
+            List<InternalRow> rows = new ArrayList<>();
+            for (Map.Entry<Identifier, Map<String, String>> entry :
+                    ((AllTableSplit) split).allOptions.entrySet()) {
+                String database = entry.getKey().getDatabaseName();
+                String tableName = entry.getKey().getTableName();
+                for (Map.Entry<String, String> entry2 : 
entry.getValue().entrySet()) {
                     String key = entry2.getKey();
                     String value = entry2.getValue();
                     rows.add(
@@ -216,25 +188,17 @@ public class AllTableOptionsTable implements 
ReadonlyTable {
                                     BinaryString.fromString(value)));
                 }
             }
-        }
-        return rows.iterator();
-    }
-
-    protected static Map<String, Map<String, Map<String, String>>> options(
-            FileIO fileIO, Map<String, Map<String, Path>> allTablePaths) {
-        Map<String, Map<String, Map<String, String>>> allOptions = new 
HashMap<>();
-        for (Map.Entry<String, Map<String, Path>> entry0 : 
allTablePaths.entrySet()) {
-            Map<String, Map<String, String>> m0 =
-                    allOptions.computeIfAbsent(entry0.getKey(), k -> new 
HashMap<>());
-            for (Map.Entry<String, Path> entry1 : 
entry0.getValue().entrySet()) {
-                Map<String, String> options =
-                        new SchemaManager(fileIO, entry1.getValue())
-                                .latest()
-                                .orElseThrow(() -> new RuntimeException("Table 
not exists."))
-                                .options();
-                m0.put(entry1.getKey(), options);
+            Iterator<InternalRow> iterator = rows.iterator();
+            if (readType != null) {
+                iterator =
+                        Iterators.transform(
+                                iterator,
+                                row ->
+                                        ProjectedRow.from(
+                                                        readType, 
AggregationFieldsTable.TABLE_TYPE)
+                                                .replaceRow(row));
             }
+            return new IteratorRecordReader<>(iterator);
         }
-        return allOptions;
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index f7aa4ab5a6..6448972cde 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -426,10 +426,6 @@ public abstract class CatalogTestBase {
                         () -> 
catalog.getTable(Identifier.create("non_existing_db", "test_table")))
                 .withMessage("Table non_existing_db.test_table does not 
exist.");
 
-        // Get all table options from system database
-        if (!supportGetFromSystemDatabase()) {
-            return;
-        }
         Table allTableOptionsTable =
                 catalog.getTable(Identifier.create(SYSTEM_DATABASE_NAME, 
ALL_TABLE_OPTIONS));
         assertThat(allTableOptionsTable).isNotNull();
@@ -1029,10 +1025,6 @@ public abstract class CatalogTestBase {
                 .isGreaterThan(0);
     }
 
-    protected boolean supportGetFromSystemDatabase() {
-        return true;
-    }
-
     protected boolean supportsAlterDatabase() {
         return false;
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index b34ca1e5ac..4bbfcde215 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -67,11 +67,6 @@ class RESTCatalogTest extends CatalogTestBase {
         restCatalogServer.shutdown();
     }
 
-    @Override
-    protected boolean supportGetFromSystemDatabase() {
-        return false;
-    }
-
     @Test
     void testInitFailWhenDefineWarehouse() {
         Options options = new Options();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
index 764c0f4e16..16e3baadfa 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
@@ -59,11 +59,12 @@ public class AllTableOptionsTableTest extends TableTestBase 
{
     }
 
     @Test
-    public void testSchemasTable() throws Exception {
+    public void testAllTableOptionsTable() throws Exception {
         List<String> result =
                 read(allTableOptionsTable).stream()
                         .map(Objects::toString)
                         .collect(Collectors.toList());
+        result = result.stream().filter(r -> 
!r.contains("path")).collect(Collectors.toList());
         assertThat(result)
                 .containsExactlyInAnyOrder(
                         "+I(default,T,fields.sales.aggregate-function,sum)",
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index b82b511b63..8cd6afbb4d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -158,7 +158,10 @@ public class CatalogTableITCase extends CatalogITCaseBase {
         sql("CREATE TABLE T (a INT, b INT) with ('a.aa.aaa'='val1', 
'b.bb.bbb'='val2')");
         sql("ALTER TABLE T SET ('c.cc.ccc' = 'val3')");
 
-        List<Row> result = sql("SELECT * FROM sys.all_table_options");
+        List<Row> result =
+                sql("SELECT * FROM sys.all_table_options").stream()
+                        .filter(row -> !row.getField(2).equals("path"))
+                        .collect(Collectors.toList());
         assertThat(result)
                 .containsExactly(
                         Row.of("default", "T", "a.aa.aaa", "val1"),
@@ -1100,6 +1103,72 @@ public class CatalogTableITCase extends 
CatalogITCaseBase {
         innerTestReadOptimizedTable();
     }
 
+    @Test
+    public void testBinlogTableStreamRead() throws Exception {
+        sql(
+                "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED) 
with ('changelog-producer' = 'lookup', "
+                        + "'bucket' = '2')");
+        BlockingIterator<Row, Row> iterator =
+                streamSqlBlockIter("SELECT * FROM T$binlog /*+ 
OPTIONS('scan.mode' = 'latest') */");
+        sql("INSERT INTO T VALUES (1, 2)");
+        sql("INSERT INTO T VALUES (1, 3)");
+        sql("INSERT INTO T VALUES (2, 2)");
+        List<Row> rows = iterator.collect(3);
+        assertThat(rows)
+                .containsExactly(
+                        Row.of("+I", new Integer[] {1}, new Integer[] {2}),
+                        Row.of("+U", new Integer[] {1, 1}, new Integer[] {2, 
3}),
+                        Row.of("+I", new Integer[] {2}, new Integer[] {2}));
+        iterator.close();
+    }
+
+    @Test
+    public void testBinlogTableBatchRead() throws Exception {
+        sql(
+                "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED) 
with ('changelog-producer' = 'lookup', "
+                        + "'bucket' = '2')");
+        sql("INSERT INTO T VALUES (1, 2)");
+        sql("INSERT INTO T VALUES (1, 3)");
+        sql("INSERT INTO T VALUES (2, 2)");
+        List<Row> rows = sql("SELECT * FROM T$binlog /*+ OPTIONS('scan.mode' = 
'latest') */");
+        assertThat(rows)
+                .containsExactly(
+                        Row.of("+I", new Integer[] {1}, new Integer[] {3}),
+                        Row.of("+I", new Integer[] {2}, new Integer[] {2}));
+    }
+
+    @Test
+    public void testIndexesTable() {
+        sql(
+                "CREATE TABLE T (pt STRING, a INT, b STRING, PRIMARY KEY (pt, 
a) NOT ENFORCED)"
+                        + " PARTITIONED BY (pt) with 
('deletion-vectors.enabled'='true')");
+        sql(
+                "INSERT INTO T VALUES ('2024-10-01', 1, 
'aaaaaaaaaaaaaaaaaaa'), ('2024-10-01', 2, 'b'), ('2024-10-01', 3, 'c')");
+        sql("INSERT INTO T VALUES ('2024-10-01', 1, 'a_new1'), ('2024-10-01', 
3, 'c_new1')");
+
+        List<Row> rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type 
= 'HASH'");
+        assertThat(rows.size()).isEqualTo(1);
+        Row row = rows.get(0);
+        assertThat(row.getField(0)).isEqualTo("{2024-10-01}");
+        assertThat(row.getField(1)).isEqualTo(0);
+        assertThat(row.getField(2)).isEqualTo("HASH");
+        assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
+        assertThat(row.getField(4)).isEqualTo(12L);
+        assertThat(row.getField(5)).isEqualTo(3L);
+        assertThat(row.getField(6)).isNull();
+
+        rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type = 
'DELETION_VECTORS'");
+        assertThat(rows.size()).isEqualTo(1);
+        row = rows.get(0);
+        assertThat(row.getField(0)).isEqualTo("{2024-10-01}");
+        assertThat(row.getField(1)).isEqualTo(0);
+        assertThat(row.getField(2)).isEqualTo("DELETION_VECTORS");
+        assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
+        assertThat(row.getField(4)).isEqualTo(33L);
+        assertThat(row.getField(5)).isEqualTo(1L);
+        assertThat(row.getField(6)).isNotNull();
+    }
+
     private void innerTestReadOptimizedTable() {
         // full compaction will always be performed at the end of batch jobs, 
as long as
         // full-compaction.delta-commits is set, regardless of its value
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
deleted file mode 100644
index e28078052b..0000000000
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink;
-
-import org.apache.paimon.utils.BlockingIterator;
-
-import org.apache.flink.types.Row;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** ITCase for system table. */
-public class SystemTableITCase extends CatalogTableITCase {
-
-    @Test
-    public void testBinlogTableStreamRead() throws Exception {
-        sql(
-                "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED) 
with ('changelog-producer' = 'lookup', "
-                        + "'bucket' = '2')");
-        BlockingIterator<Row, Row> iterator =
-                streamSqlBlockIter("SELECT * FROM T$binlog /*+ 
OPTIONS('scan.mode' = 'latest') */");
-        sql("INSERT INTO T VALUES (1, 2)");
-        sql("INSERT INTO T VALUES (1, 3)");
-        sql("INSERT INTO T VALUES (2, 2)");
-        List<Row> rows = iterator.collect(3);
-        assertThat(rows)
-                .containsExactly(
-                        Row.of("+I", new Integer[] {1}, new Integer[] {2}),
-                        Row.of("+U", new Integer[] {1, 1}, new Integer[] {2, 
3}),
-                        Row.of("+I", new Integer[] {2}, new Integer[] {2}));
-        iterator.close();
-    }
-
-    @Test
-    public void testBinlogTableBatchRead() throws Exception {
-        sql(
-                "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED) 
with ('changelog-producer' = 'lookup', "
-                        + "'bucket' = '2')");
-        sql("INSERT INTO T VALUES (1, 2)");
-        sql("INSERT INTO T VALUES (1, 3)");
-        sql("INSERT INTO T VALUES (2, 2)");
-        List<Row> rows = sql("SELECT * FROM T$binlog /*+ OPTIONS('scan.mode' = 
'latest') */");
-        assertThat(rows)
-                .containsExactly(
-                        Row.of("+I", new Integer[] {1}, new Integer[] {3}),
-                        Row.of("+I", new Integer[] {2}, new Integer[] {2}));
-    }
-
-    @Test
-    public void testIndexesTable() {
-        sql(
-                "CREATE TABLE T (pt STRING, a INT, b STRING, PRIMARY KEY (pt, 
a) NOT ENFORCED)"
-                        + " PARTITIONED BY (pt) with 
('deletion-vectors.enabled'='true')");
-        sql(
-                "INSERT INTO T VALUES ('2024-10-01', 1, 
'aaaaaaaaaaaaaaaaaaa'), ('2024-10-01', 2, 'b'), ('2024-10-01', 3, 'c')");
-        sql("INSERT INTO T VALUES ('2024-10-01', 1, 'a_new1'), ('2024-10-01', 
3, 'c_new1')");
-
-        List<Row> rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type 
= 'HASH'");
-        assertThat(rows.size()).isEqualTo(1);
-        Row row = rows.get(0);
-        assertThat(row.getField(0)).isEqualTo("{2024-10-01}");
-        assertThat(row.getField(1)).isEqualTo(0);
-        assertThat(row.getField(2)).isEqualTo("HASH");
-        assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
-        assertThat(row.getField(4)).isEqualTo(12L);
-        assertThat(row.getField(5)).isEqualTo(3L);
-        assertThat(row.getField(6)).isNull();
-
-        rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type = 
'DELETION_VECTORS'");
-        assertThat(rows.size()).isEqualTo(1);
-        row = rows.get(0);
-        assertThat(row.getField(0)).isEqualTo("{2024-10-01}");
-        assertThat(row.getField(1)).isEqualTo(0);
-        assertThat(row.getField(2)).isEqualTo("DELETION_VECTORS");
-        assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
-        assertThat(row.getField(4)).isEqualTo(33L);
-        assertThat(row.getField(5)).isEqualTo(1L);
-        assertThat(row.getField(6)).isNotNull();
-    }
-}

Reply via email to