This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2314a1de9 [core] Add time to TableSchema (#1716)
2314a1de9 is described below

commit 2314a1de9d8e0a12582dc6a8979d5dcb4645f709
Author: shidayang <[email protected]>
AuthorDate: Mon Aug 14 11:06:40 2023 +0800

    [core] Add time to TableSchema (#1716)
---
 docs/content/how-to/system-tables.md               |  14 +--
 .../org/apache/paimon/schema/SchemaSerializer.java |  13 ++-
 .../java/org/apache/paimon/schema/TableSchema.java |  36 +++++++-
 .../apache/paimon/table/system/SchemasTable.java   |  14 ++-
 .../paimon/table/system/SchemasTableTest.java      | 101 +++++++++++++++++++++
 .../apache/paimon/flink/CatalogTableITCase.java    |  23 ++++-
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  |  10 +-
 7 files changed, 193 insertions(+), 18 deletions(-)

diff --git a/docs/content/how-to/system-tables.md 
b/docs/content/how-to/system-tables.md
index 7f25accd7..0077cf85d 100644
--- a/docs/content/how-to/system-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -65,13 +65,13 @@ You can query the historical schemas of the table through 
schemas table.
 SELECT * FROM MyTable$schemas;
 
 /*
-+-----------+--------------------------------+----------------+--------------+---------+---------+
-| schema_id |                         fields | partition_keys | primary_keys | 
options | comment |
-+-----------+--------------------------------+----------------+--------------+---------+---------+
-|         0 | [{"id":0,"name":"word","typ... |             [] |     ["word"] | 
     {} |         |
-|         1 | [{"id":0,"name":"word","typ... |             [] |     ["word"] | 
     {} |         |
-|         2 | [{"id":0,"name":"word","typ... |             [] |     ["word"] | 
     {} |         |
-+-----------+--------------------------------+----------------+--------------+---------+---------+
++-----------+--------------------------------+----------------+--------------+---------+---------+-------------------------+
+| schema_id |                         fields | partition_keys | primary_keys | 
options | comment |       update_time       |
++-----------+--------------------------------+----------------+--------------+---------+---------+-------------------------+
+|         0 | [{"id":0,"name":"word","typ... |             [] |     ["word"] | 
     {} |         | 2022-10-28 11:44:20.600 |
+|         1 | [{"id":0,"name":"word","typ... |             [] |     ["word"] | 
     {} |         | 2022-10-27 11:44:15.600 |
+|         2 | [{"id":0,"name":"word","typ... |             [] |     ["word"] | 
     {} |         | 2022-10-26 11:44:10.600 |
++-----------+--------------------------------+----------------+--------------+---------+---------+-------------------------+
 3 rows in set
 */
 ```
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java
index 6fdd9d929..af78e3b97 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java
@@ -76,6 +76,8 @@ public class SchemaSerializer
             generator.writeStringField("comment", tableSchema.comment());
         }
 
+        generator.writeNumberField("timeMillis", tableSchema.timeMillis());
+
         generator.writeEndObject();
     }
 
@@ -117,7 +119,16 @@ public class SchemaSerializer
             comment = commentNode.asText();
         }
 
+        long timeMillis = node.get("timeMillis") == null ? 0 : 
node.get("timeMillis").asLong();
+
         return new TableSchema(
-                id, fields, highestFieldId, partitionKeys, primaryKeys, 
options, comment);
+                id,
+                fields,
+                highestFieldId,
+                partitionKeys,
+                primaryKeys,
+                options,
+                comment,
+                timeMillis);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
index dd1d6bb6e..feec6eb50 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -58,6 +58,8 @@ public class TableSchema implements Serializable {
 
     private final String comment;
 
+    private final long timeMillis;
+
     public TableSchema(
             long id,
             List<DataField> fields,
@@ -66,6 +68,26 @@ public class TableSchema implements Serializable {
             List<String> primaryKeys,
             Map<String, String> options,
             String comment) {
+        this(
+                id,
+                fields,
+                highestFieldId,
+                partitionKeys,
+                primaryKeys,
+                options,
+                comment,
+                System.currentTimeMillis());
+    }
+
+    public TableSchema(
+            long id,
+            List<DataField> fields,
+            int highestFieldId,
+            List<String> partitionKeys,
+            List<String> primaryKeys,
+            Map<String, String> options,
+            String comment,
+            long timeMillis) {
         this.id = id;
         this.fields = fields;
         this.highestFieldId = highestFieldId;
@@ -73,6 +95,7 @@ public class TableSchema implements Serializable {
         this.primaryKeys = primaryKeys;
         this.options = Collections.unmodifiableMap(options);
         this.comment = comment;
+        this.timeMillis = timeMillis;
 
         // try to trim to validate primary keys
         trimmedPrimaryKeys();
@@ -186,6 +209,10 @@ public class TableSchema implements Serializable {
         return comment;
     }
 
+    public long timeMillis() {
+        return timeMillis;
+    }
+
     public RowType logicalRowType() {
         return new RowType(fields);
     }
@@ -224,7 +251,14 @@ public class TableSchema implements Serializable {
 
     public TableSchema copy(Map<String, String> newOptions) {
         return new TableSchema(
-                id, fields, highestFieldId, partitionKeys, primaryKeys, 
newOptions, comment);
+                id,
+                fields,
+                highestFieldId,
+                partitionKeys,
+                primaryKeys,
+                newOptions,
+                comment,
+                timeMillis);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
index eed7d4d7b..b56b48539 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.system;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -38,6 +39,7 @@ import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.utils.IteratorRecordReader;
 import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.ProjectedRow;
@@ -46,6 +48,9 @@ import org.apache.paimon.utils.SerializationUtils;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
 
 import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -72,7 +77,8 @@ public class SchemasTable implements ReadonlyTable {
                             new DataField(
                                     3, "primary_keys", 
SerializationUtils.newStringType(false)),
                             new DataField(4, "options", 
SerializationUtils.newStringType(false)),
-                            new DataField(5, "comment", 
SerializationUtils.newStringType(true))));
+                            new DataField(5, "comment", 
SerializationUtils.newStringType(true)),
+                            new DataField(6, "update_time", new 
TimestampType(false, 3))));
 
     private final FileIO fileIO;
     private final Path location;
@@ -211,7 +217,11 @@ public class SchemasTable implements ReadonlyTable {
                     toJson(schema.partitionKeys()),
                     toJson(schema.primaryKeys()),
                     toJson(schema.options()),
-                    BinaryString.fromString(schema.comment()));
+                    BinaryString.fromString(schema.comment()),
+                    Timestamp.fromLocalDateTime(
+                            LocalDateTime.ofInstant(
+                                    Instant.ofEpochMilli(schema.timeMillis()),
+                                    ZoneId.systemDefault())));
         }
 
         private BinaryString toJson(Object obj) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/system/SchemasTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/system/SchemasTableTest.java
new file mode 100644
index 000000000..a10b2e1d7
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/system/SchemasTableTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.JsonSerdeUtil.toFlatJson;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link SchemasTable}. */
+public class SchemasTableTest extends TableTestBase {
+
+    private SchemasTable schemasTable;
+    private SchemaManager schemaManager;
+
+    @BeforeEach
+    public void before() throws Exception {
+        Identifier identifier = identifier("T");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pk", DataTypes.INT())
+                        .column("pt", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        schemasTable = (SchemasTable) 
catalog.getTable(identifier("T$schemas"));
+
+        FileIO fileIO = LocalFileIO.create();
+        Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse, 
database, "T"));
+        schemaManager = new SchemaManager(fileIO, tablePath);
+    }
+
+    @Test
+    public void testSchemasTable() throws Exception {
+        List<InternalRow> expectRow = getExceptedResult();
+        List<InternalRow> result = read(schemasTable);
+        assertThat(result).containsExactlyElementsOf(expectRow);
+    }
+
+    private List<InternalRow> getExceptedResult() {
+        List<TableSchema> tableSchemas = schemaManager.listAll();
+
+        List<InternalRow> expectedRow = new ArrayList<>();
+        for (TableSchema schema : tableSchemas) {
+            expectedRow.add(
+                    GenericRow.of(
+                            schema.id(),
+                            
BinaryString.fromString(toFlatJson(schema.fields())),
+                            
BinaryString.fromString(toFlatJson(schema.partitionKeys())),
+                            
BinaryString.fromString(toFlatJson(schema.primaryKeys())),
+                            
BinaryString.fromString(toFlatJson(schema.options())),
+                            BinaryString.fromString(schema.comment()),
+                            Timestamp.fromLocalDateTime(
+                                    LocalDateTime.ofInstant(
+                                            
Instant.ofEpochMilli(schema.timeMillis()),
+                                            ZoneId.systemDefault()))));
+        }
+        return expectedRow;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index bce8c48ed..b9a148d4b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -180,10 +180,14 @@ public class CatalogTableITCase extends CatalogITCaseBase 
{
                                 + "  `partition_keys` VARCHAR(2147483647) NOT 
NULL,\n"
                                 + "  `primary_keys` VARCHAR(2147483647) NOT 
NULL,\n"
                                 + "  `options` VARCHAR(2147483647) NOT NULL,\n"
-                                + "  `comment` VARCHAR(2147483647)\n"
+                                + "  `comment` VARCHAR(2147483647),\n"
+                                + "  `update_time` TIMESTAMP(3) NOT NULL\n"
                                 + ") ]]");
 
-        List<Row> result = sql("SELECT * FROM T$schemas order by schema_id");
+        List<Row> result =
+                sql(
+                        "SELECT schema_id, fields, partition_keys, "
+                                + "primary_keys, options, `comment` FROM 
T$schemas order by schema_id");
 
         assertThat(result.toString())
                 .isEqualTo(
@@ -222,7 +226,10 @@ public class CatalogTableITCase extends CatalogITCaseBase {
     public void testCreateTableLike() throws Exception {
         sql("CREATE TABLE T (a INT)");
         sql("CREATE TABLE T1 LIKE T");
-        List<Row> result = sql("SELECT * FROM T1$schemas s");
+        List<Row> result =
+                sql(
+                        "SELECT schema_id, fields, partition_keys, "
+                                + "primary_keys, options, `comment` FROM 
T1$schemas s");
         assertThat(result.toString())
                 .isEqualTo("[+I[0, 
[{\"id\":0,\"name\":\"a\",\"type\":\"INT\"}], [], [], {}, ]]");
     }
@@ -232,7 +239,10 @@ public class CatalogTableITCase extends CatalogITCaseBase {
         sql("CREATE TABLE t (a INT)");
         sql("INSERT INTO t VALUES(1),(2)");
         sql("CREATE TABLE t1 AS SELECT * FROM t");
-        List<Row> result = sql("SELECT * FROM t1$schemas s");
+        List<Row> result =
+                sql(
+                        "SELECT schema_id, fields, partition_keys, "
+                                + "primary_keys, options, `comment` FROM 
t1$schemas s");
         assertThat(result.toString())
                 .isEqualTo("[+I[0, 
[{\"id\":0,\"name\":\"a\",\"type\":\"INT\"}], [], [], {}, ]]");
         List<Row> data = sql("SELECT * FROM t1");
@@ -249,7 +259,10 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                         + ") PARTITIONED BY (dt, hh)");
         sql("INSERT INTO t_p SELECT 1,2,'a','2023-02-19','12'");
         sql("CREATE TABLE t1_p WITH ('partition' = 'dt' ) AS SELECT * FROM 
t_p");
-        List<Row> resultPartition = sql("SELECT * FROM t1_p$schemas s");
+        List<Row> resultPartition =
+                sql(
+                        "SELECT schema_id, fields, partition_keys, "
+                                + "primary_keys, options, `comment` FROM 
t1_p$schemas s");
         assertThat(resultPartition.toString())
                 .isEqualTo(
                         "[+I[0, 
[{\"id\":0,\"name\":\"user_id\",\"type\":\"BIGINT\"},{\"id\":1,\"name\":\"item_id\",\"type\":\"BIGINT\"},"
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index e970377de..c2ceba3bc 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -351,7 +351,10 @@ public abstract class HiveCatalogITCaseBase {
         tEnv.executeSql("CREATE TABLE t (a INT)").await();
         tEnv.executeSql("INSERT INTO t VALUES(1)").await();
         tEnv.executeSql("CREATE TABLE t1 AS SELECT * FROM t").await();
-        List<Row> result = collect("SELECT * FROM t1$schemas s");
+        List<Row> result =
+                collect(
+                        "SELECT schema_id, fields, partition_keys, "
+                                + "primary_keys, options, `comment`  FROM 
t1$schemas s");
         assertThat(result.toString())
                 .isEqualTo("[+I[0, 
[{\"id\":0,\"name\":\"a\",\"type\":\"INT\"}], [], [], {}, ]]");
         List<Row> data = collect("SELECT * FROM t1");
@@ -379,7 +382,10 @@ public abstract class HiveCatalogITCaseBase {
                         + ") PARTITIONED BY (dt, hh)");
         tEnv.executeSql("INSERT INTO t_p  SELECT 
1,2,'a','2023-02-19','12'").await();
         tEnv.executeSql("CREATE TABLE t1_p WITH ('partition' = 'dt') AS SELECT 
* FROM t_p").await();
-        List<Row> resultPartition = collect("SELECT * FROM t1_p$schemas s");
+        List<Row> resultPartition =
+                collect(
+                        "SELECT schema_id, fields, partition_keys, "
+                                + "primary_keys, options, `comment`  FROM 
t1_p$schemas s");
         assertThat(resultPartition.toString())
                 .isEqualTo(
                         "[+I[0, 
[{\"id\":0,\"name\":\"user_id\",\"type\":\"BIGINT\"},{\"id\":1,\"name\":\"item_id\",\"type\":\"BIGINT\"},{\"id\":2,\"name\":\"behavior\",\"type\":\"STRING\"}"

Reply via email to