This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2314a1de9 [core] Add time to TableSchema (#1716)
2314a1de9 is described below
commit 2314a1de9d8e0a12582dc6a8979d5dcb4645f709
Author: shidayang <[email protected]>
AuthorDate: Mon Aug 14 11:06:40 2023 +0800
[core] Add time to TableSchema (#1716)
---
docs/content/how-to/system-tables.md | 14 +--
.../org/apache/paimon/schema/SchemaSerializer.java | 13 ++-
.../java/org/apache/paimon/schema/TableSchema.java | 36 +++++++-
.../apache/paimon/table/system/SchemasTable.java | 14 ++-
.../paimon/table/system/SchemasTableTest.java | 101 +++++++++++++++++++++
.../apache/paimon/flink/CatalogTableITCase.java | 23 ++++-
.../apache/paimon/hive/HiveCatalogITCaseBase.java | 10 +-
7 files changed, 193 insertions(+), 18 deletions(-)
diff --git a/docs/content/how-to/system-tables.md
b/docs/content/how-to/system-tables.md
index 7f25accd7..0077cf85d 100644
--- a/docs/content/how-to/system-tables.md
+++ b/docs/content/how-to/system-tables.md
@@ -65,13 +65,13 @@ You can query the historical schemas of the table through
schemas table.
SELECT * FROM MyTable$schemas;
/*
-+-----------+--------------------------------+----------------+--------------+---------+---------+
-| schema_id | fields | partition_keys | primary_keys |
options | comment |
-+-----------+--------------------------------+----------------+--------------+---------+---------+
-| 0 | [{"id":0,"name":"word","typ... | [] | ["word"] |
{} | |
-| 1 | [{"id":0,"name":"word","typ... | [] | ["word"] |
{} | |
-| 2 | [{"id":0,"name":"word","typ... | [] | ["word"] |
{} | |
-+-----------+--------------------------------+----------------+--------------+---------+---------+
++-----------+--------------------------------+----------------+--------------+---------+---------+-------------------------+
+| schema_id | fields | partition_keys | primary_keys |
options | comment | update_time |
++-----------+--------------------------------+----------------+--------------+---------+---------+-------------------------+
+| 0 | [{"id":0,"name":"word","typ... | [] | ["word"] |
{} | | 2022-10-28 11:44:20.600 |
+| 1 | [{"id":0,"name":"word","typ... | [] | ["word"] |
{} | | 2022-10-27 11:44:15.600 |
+| 2 | [{"id":0,"name":"word","typ... | [] | ["word"] |
{} | | 2022-10-26 11:44:10.600 |
++-----------+--------------------------------+----------------+--------------+---------+---------+-------------------------+
3 rows in set
*/
```
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java
index 6fdd9d929..af78e3b97 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaSerializer.java
@@ -76,6 +76,8 @@ public class SchemaSerializer
generator.writeStringField("comment", tableSchema.comment());
}
+ generator.writeNumberField("timeMillis", tableSchema.timeMillis());
+
generator.writeEndObject();
}
@@ -117,7 +119,16 @@ public class SchemaSerializer
comment = commentNode.asText();
}
+ long timeMillis = node.get("timeMillis") == null ? 0 :
node.get("timeMillis").asLong();
+
return new TableSchema(
- id, fields, highestFieldId, partitionKeys, primaryKeys,
options, comment);
+ id,
+ fields,
+ highestFieldId,
+ partitionKeys,
+ primaryKeys,
+ options,
+ comment,
+ timeMillis);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
index dd1d6bb6e..feec6eb50 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/TableSchema.java
@@ -58,6 +58,8 @@ public class TableSchema implements Serializable {
private final String comment;
+ private final long timeMillis;
+
public TableSchema(
long id,
List<DataField> fields,
@@ -66,6 +68,26 @@ public class TableSchema implements Serializable {
List<String> primaryKeys,
Map<String, String> options,
String comment) {
+ this(
+ id,
+ fields,
+ highestFieldId,
+ partitionKeys,
+ primaryKeys,
+ options,
+ comment,
+ System.currentTimeMillis());
+ }
+
+ public TableSchema(
+ long id,
+ List<DataField> fields,
+ int highestFieldId,
+ List<String> partitionKeys,
+ List<String> primaryKeys,
+ Map<String, String> options,
+ String comment,
+ long timeMillis) {
this.id = id;
this.fields = fields;
this.highestFieldId = highestFieldId;
@@ -73,6 +95,7 @@ public class TableSchema implements Serializable {
this.primaryKeys = primaryKeys;
this.options = Collections.unmodifiableMap(options);
this.comment = comment;
+ this.timeMillis = timeMillis;
// try to trim to validate primary keys
trimmedPrimaryKeys();
@@ -186,6 +209,10 @@ public class TableSchema implements Serializable {
return comment;
}
+ public long timeMillis() {
+ return timeMillis;
+ }
+
public RowType logicalRowType() {
return new RowType(fields);
}
@@ -224,7 +251,14 @@ public class TableSchema implements Serializable {
public TableSchema copy(Map<String, String> newOptions) {
return new TableSchema(
- id, fields, highestFieldId, partitionKeys, primaryKeys,
newOptions, comment);
+ id,
+ fields,
+ highestFieldId,
+ partitionKeys,
+ primaryKeys,
+ newOptions,
+ comment,
+ timeMillis);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
index eed7d4d7b..b56b48539 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/SchemasTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.system;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -38,6 +39,7 @@ import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.apache.paimon.utils.ProjectedRow;
@@ -46,6 +48,9 @@ import org.apache.paimon.utils.SerializationUtils;
import org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
import java.io.IOException;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
@@ -72,7 +77,8 @@ public class SchemasTable implements ReadonlyTable {
new DataField(
3, "primary_keys",
SerializationUtils.newStringType(false)),
new DataField(4, "options",
SerializationUtils.newStringType(false)),
- new DataField(5, "comment",
SerializationUtils.newStringType(true))));
+ new DataField(5, "comment",
SerializationUtils.newStringType(true)),
+ new DataField(6, "update_time", new
TimestampType(false, 3))));
private final FileIO fileIO;
private final Path location;
@@ -211,7 +217,11 @@ public class SchemasTable implements ReadonlyTable {
toJson(schema.partitionKeys()),
toJson(schema.primaryKeys()),
toJson(schema.options()),
- BinaryString.fromString(schema.comment()));
+ BinaryString.fromString(schema.comment()),
+ Timestamp.fromLocalDateTime(
+ LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(schema.timeMillis()),
+ ZoneId.systemDefault())));
}
private BinaryString toJson(Object obj) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/SchemasTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/SchemasTableTest.java
new file mode 100644
index 000000000..a10b2e1d7
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/SchemasTableTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.JsonSerdeUtil.toFlatJson;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link SchemasTable}. */
+public class SchemasTableTest extends TableTestBase {
+
+ private SchemasTable schemasTable;
+ private SchemaManager schemaManager;
+
+ @BeforeEach
+ public void before() throws Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("pk", DataTypes.INT())
+ .column("pt", DataTypes.INT())
+ .column("col1", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("pk", "pt")
+ .option(CoreOptions.CHANGELOG_PRODUCER.key(), "input")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ schemasTable = (SchemasTable)
catalog.getTable(identifier("T$schemas"));
+
+ FileIO fileIO = LocalFileIO.create();
+ Path tablePath = new Path(String.format("%s/%s.db/%s", warehouse,
database, "T"));
+ schemaManager = new SchemaManager(fileIO, tablePath);
+ }
+
+ @Test
+ public void testSchemasTable() throws Exception {
+ List<InternalRow> expectRow = getExceptedResult();
+ List<InternalRow> result = read(schemasTable);
+ assertThat(result).containsExactlyElementsOf(expectRow);
+ }
+
+ private List<InternalRow> getExceptedResult() {
+ List<TableSchema> tableSchemas = schemaManager.listAll();
+
+ List<InternalRow> expectedRow = new ArrayList<>();
+ for (TableSchema schema : tableSchemas) {
+ expectedRow.add(
+ GenericRow.of(
+ schema.id(),
+
BinaryString.fromString(toFlatJson(schema.fields())),
+
BinaryString.fromString(toFlatJson(schema.partitionKeys())),
+
BinaryString.fromString(toFlatJson(schema.primaryKeys())),
+
BinaryString.fromString(toFlatJson(schema.options())),
+ BinaryString.fromString(schema.comment()),
+ Timestamp.fromLocalDateTime(
+ LocalDateTime.ofInstant(
+
Instant.ofEpochMilli(schema.timeMillis()),
+ ZoneId.systemDefault()))));
+ }
+ return expectedRow;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index bce8c48ed..b9a148d4b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -180,10 +180,14 @@ public class CatalogTableITCase extends CatalogITCaseBase
{
+ " `partition_keys` VARCHAR(2147483647) NOT
NULL,\n"
+ " `primary_keys` VARCHAR(2147483647) NOT
NULL,\n"
+ " `options` VARCHAR(2147483647) NOT NULL,\n"
- + " `comment` VARCHAR(2147483647)\n"
+ + " `comment` VARCHAR(2147483647),\n"
+ + " `update_time` TIMESTAMP(3) NOT NULL\n"
+ ") ]]");
- List<Row> result = sql("SELECT * FROM T$schemas order by schema_id");
+ List<Row> result =
+ sql(
+ "SELECT schema_id, fields, partition_keys, "
+ + "primary_keys, options, `comment` FROM
T$schemas order by schema_id");
assertThat(result.toString())
.isEqualTo(
@@ -222,7 +226,10 @@ public class CatalogTableITCase extends CatalogITCaseBase {
public void testCreateTableLike() throws Exception {
sql("CREATE TABLE T (a INT)");
sql("CREATE TABLE T1 LIKE T");
- List<Row> result = sql("SELECT * FROM T1$schemas s");
+ List<Row> result =
+ sql(
+ "SELECT schema_id, fields, partition_keys, "
+ + "primary_keys, options, `comment` FROM
T1$schemas s");
assertThat(result.toString())
.isEqualTo("[+I[0,
[{\"id\":0,\"name\":\"a\",\"type\":\"INT\"}], [], [], {}, ]]");
}
@@ -232,7 +239,10 @@ public class CatalogTableITCase extends CatalogITCaseBase {
sql("CREATE TABLE t (a INT)");
sql("INSERT INTO t VALUES(1),(2)");
sql("CREATE TABLE t1 AS SELECT * FROM t");
- List<Row> result = sql("SELECT * FROM t1$schemas s");
+ List<Row> result =
+ sql(
+ "SELECT schema_id, fields, partition_keys, "
+ + "primary_keys, options, `comment` FROM
t1$schemas s");
assertThat(result.toString())
.isEqualTo("[+I[0,
[{\"id\":0,\"name\":\"a\",\"type\":\"INT\"}], [], [], {}, ]]");
List<Row> data = sql("SELECT * FROM t1");
@@ -249,7 +259,10 @@ public class CatalogTableITCase extends CatalogITCaseBase {
+ ") PARTITIONED BY (dt, hh)");
sql("INSERT INTO t_p SELECT 1,2,'a','2023-02-19','12'");
sql("CREATE TABLE t1_p WITH ('partition' = 'dt' ) AS SELECT * FROM
t_p");
- List<Row> resultPartition = sql("SELECT * FROM t1_p$schemas s");
+ List<Row> resultPartition =
+ sql(
+ "SELECT schema_id, fields, partition_keys, "
+ + "primary_keys, options, `comment` FROM
t1_p$schemas s");
assertThat(resultPartition.toString())
.isEqualTo(
"[+I[0,
[{\"id\":0,\"name\":\"user_id\",\"type\":\"BIGINT\"},{\"id\":1,\"name\":\"item_id\",\"type\":\"BIGINT\"},"
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index e970377de..c2ceba3bc 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -351,7 +351,10 @@ public abstract class HiveCatalogITCaseBase {
tEnv.executeSql("CREATE TABLE t (a INT)").await();
tEnv.executeSql("INSERT INTO t VALUES(1)").await();
tEnv.executeSql("CREATE TABLE t1 AS SELECT * FROM t").await();
- List<Row> result = collect("SELECT * FROM t1$schemas s");
+ List<Row> result =
+ collect(
+ "SELECT schema_id, fields, partition_keys, "
+ + "primary_keys, options, `comment` FROM
t1$schemas s");
assertThat(result.toString())
.isEqualTo("[+I[0,
[{\"id\":0,\"name\":\"a\",\"type\":\"INT\"}], [], [], {}, ]]");
List<Row> data = collect("SELECT * FROM t1");
@@ -379,7 +382,10 @@ public abstract class HiveCatalogITCaseBase {
+ ") PARTITIONED BY (dt, hh)");
tEnv.executeSql("INSERT INTO t_p SELECT
1,2,'a','2023-02-19','12'").await();
tEnv.executeSql("CREATE TABLE t1_p WITH ('partition' = 'dt') AS SELECT
* FROM t_p").await();
- List<Row> resultPartition = collect("SELECT * FROM t1_p$schemas s");
+ List<Row> resultPartition =
+ collect(
+ "SELECT schema_id, fields, partition_keys, "
+ + "primary_keys, options, `comment` FROM
t1_p$schemas s");
assertThat(resultPartition.toString())
.isEqualTo(
"[+I[0,
[{\"id\":0,\"name\":\"user_id\",\"type\":\"BIGINT\"},{\"id\":1,\"name\":\"item_id\",\"type\":\"BIGINT\"},{\"id\":2,\"name\":\"behavior\",\"type\":\"STRING\"}"