This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new a71ec64a [hotfix] Enable junit4 tests in core and fix 
TableStoreCatalogTest
a71ec64a is described below

commit a71ec64a4de066c4aac7af443a0f90c40cee9503
Author: JingsongLi <[email protected]>
AuthorDate: Tue Jun 14 18:06:18 2022 +0800

    [hotfix] Enable junit4 tests in core and fix TableStoreCatalogTest
---
 flink-table-store-core/pom.xml                     | 13 ++++
 .../store/file/catalog/FileSystemCatalog.java      |  4 +-
 .../store/file/catalog/TableStoreCatalogTest.java  | 89 +++++++++++++++++++++-
 3 files changed, 103 insertions(+), 3 deletions(-)

diff --git a/flink-table-store-core/pom.xml b/flink-table-store-core/pom.xml
index b039068f..e371d9eb 100644
--- a/flink-table-store-core/pom.xml
+++ b/flink-table-store-core/pom.xml
@@ -124,6 +124,19 @@ under the License.
             <version>${junit4.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.junit.vintage</groupId>
+            <artifactId>junit-vintage-engine</artifactId>
+            <version>${junit5.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>junit</groupId>
+                    <artifactId>junit</artifactId>
+                </exclusion>
+            </exclusions>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
index c772a867..027414bb 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.file.catalog;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -272,7 +273,8 @@ public class FileSystemCatalog extends TableStoreCatalog {
         return new Path(warehouse, database + DB_SUFFIX);
     }
 
-    private Path tablePath(ObjectPath objectPath) {
+    @VisibleForTesting
+    Path tablePath(ObjectPath objectPath) {
         return new Path(databasePath(objectPath.getDatabaseName()), 
objectPath.getObjectName());
     }
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java
index 60c52b05..83f13f16 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/catalog/TableStoreCatalogTest.java
@@ -18,13 +18,16 @@
 
 package org.apache.flink.table.store.file.catalog;
 
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
 import org.apache.flink.table.catalog.CatalogDatabaseImpl;
 import org.apache.flink.table.catalog.CatalogFunction;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTestBase;
 import org.apache.flink.table.catalog.CatalogTestUtil;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
@@ -35,6 +38,8 @@ import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -46,6 +51,24 @@ public abstract class TableStoreCatalogTest extends 
CatalogTestBase {
         return false;
     }
 
+    @Override
+    protected Map<String, String> getBatchTableProperties() {
+        return new HashMap<String, String>() {
+            {
+                this.put("is_streaming", "false");
+            }
+        };
+    }
+
+    @Override
+    protected Map<String, String> getStreamingTableProperties() {
+        return new HashMap<String, String>() {
+            {
+                this.put("is_streaming", "true");
+            }
+        };
+    }
+
     @Override
     public CatalogDatabase createDb() {
         return new CatalogDatabaseImpl(Collections.emptyMap(), "");
@@ -88,11 +111,11 @@ public abstract class TableStoreCatalogTest extends 
CatalogTestBase {
         catalog.createDatabase("db1", this.createDb(), false);
         CatalogTable table = this.createTable();
         catalog.createTable(this.path1, table, false);
-        CatalogTestUtil.checkEquals(table, (CatalogTable) 
catalog.getTable(this.path1));
+        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
         CatalogTable newTable = this.createAnotherTable();
         catalog.alterTable(this.path1, newTable, false);
         Assert.assertNotEquals(table, catalog.getTable(this.path1));
-        CatalogTestUtil.checkEquals(newTable, (CatalogTable) 
catalog.getTable(this.path1));
+        checkEquals(path1, newTable, (CatalogTable) 
catalog.getTable(this.path1));
         catalog.dropTable(this.path1, false);
 
         // Not support views
@@ -129,6 +152,68 @@ public abstract class TableStoreCatalogTest extends 
CatalogTestBase {
                         "Table Store Catalog only supports table store tables, 
not Flink connector: filesystem");
     }
 
+    @Test
+    public void testCreateTable_Streaming() throws Exception {
+        catalog.createDatabase("db1", createDb(), false);
+        CatalogTable table = createStreamingTable();
+        catalog.createTable(path1, table, false);
+        checkEquals(path1, table, (CatalogTable) catalog.getTable(path1));
+    }
+
+    @Test
+    public void testAlterPartitionedTable() throws Exception {
+        catalog.createDatabase("db1", this.createDb(), false);
+        CatalogTable table = this.createPartitionedTable();
+        catalog.createTable(this.path1, table, false);
+        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+        CatalogTable newTable = this.createAnotherPartitionedTable();
+        catalog.alterTable(this.path1, newTable, false);
+        checkEquals(path1, newTable, (CatalogTable) 
catalog.getTable(this.path1));
+    }
+
+    @Test
+    public void testCreateTable_Batch() throws Exception {
+        catalog.createDatabase("db1", this.createDb(), false);
+        CatalogTable table = this.createTable();
+        catalog.createTable(this.path1, table, false);
+        CatalogBaseTable tableCreated = catalog.getTable(this.path1);
+        checkEquals(path1, table, (CatalogTable) tableCreated);
+        Assert.assertEquals("test comment", 
tableCreated.getDescription().get());
+        List<String> tables = catalog.listTables("db1");
+        Assert.assertEquals(1L, tables.size());
+        Assert.assertEquals(this.path1.getObjectName(), tables.get(0));
+        catalog.dropTable(this.path1, false);
+    }
+
+    @Test
+    public void testCreateTable_TableAlreadyExist_ignored() throws Exception {
+        catalog.createDatabase("db1", this.createDb(), false);
+        CatalogTable table = this.createTable();
+        catalog.createTable(this.path1, table, false);
+        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+        catalog.createTable(this.path1, this.createAnotherTable(), true);
+        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+    }
+
+    @Test
+    public void testCreatePartitionedTable_Batch() throws Exception {
+        catalog.createDatabase("db1", this.createDb(), false);
+        CatalogTable table = this.createPartitionedTable();
+        catalog.createTable(this.path1, table, false);
+        checkEquals(path1, table, (CatalogTable) catalog.getTable(this.path1));
+        List<String> tables = catalog.listTables("db1");
+        Assert.assertEquals(1L, tables.size());
+        Assert.assertEquals(this.path1.getObjectName(), tables.get(0));
+    }
+
+    private void checkEquals(ObjectPath path, CatalogTable t1, CatalogTable 
t2) {
+        Path tablePath = ((FileSystemCatalog) catalog).tablePath(path);
+        Map<String, String> options = new HashMap<>(t1.getOptions());
+        options.put("path", tablePath.toString());
+        t1 = ((ResolvedCatalogTable) t1).copy(options);
+        CatalogTestUtil.checkEquals(t1, t2);
+    }
+
     // --------------------- unsupported methods ----------------------------
 
     @Override

Reply via email to