This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 714d1cb2e0bd0df03393492dc87cbd800af63e1b
Author: fengli <ldliu...@163.com>
AuthorDate: Wed Apr 24 18:18:47 2024 +0800

    [FLINK-35189][test-utils] Introduce test-filesystem connector and catalog 
based on filesystem to support materialized table
---
 .../flink-table-filesystem-test-utils/pom.xml      | 103 +++
 .../flink/connector/file/src/TestFileSource.java   | 189 ++++++
 .../file/table/TestFileSystemTableSource.java      |  88 +++
 .../file/testutils/TestFileSystemTableFactory.java |  54 ++
 .../file/testutils/catalog/JsonSerdeUtil.java      |  61 ++
 .../testutils/catalog/TestFileSystemCatalog.java   | 690 +++++++++++++++++++++
 .../catalog/TestFileSystemCatalogFactory.java      |  78 +++
 .../org.apache.flink.table.factories.Factory       |  17 +
 .../testutils/TestFileSystemTableFactoryTest.java  |  64 ++
 .../catalog/TestFileSystemCatalogFactoryTest.java  |  92 +++
 .../catalog/TestFileSystemCatalogITCase.java       |  89 +++
 .../catalog/TestFileSystemCatalogTest.java         | 379 +++++++++++
 .../catalog/TestFileSystemCatalogTestBase.java     |  56 ++
 .../src/test/resources/log4j2-test.properties      |  28 +
 flink-test-utils-parent/pom.xml                    |   1 +
 15 files changed, 1989 insertions(+)

diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/pom.xml 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/pom.xml
new file mode 100644
index 00000000000..e1c8fdce665
--- /dev/null
+++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/pom.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.flink</groupId>
+        <artifactId>flink-test-utils-parent</artifactId>
+        <version>1.20-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>flink-table-filesystem-test-utils</artifactId>
+    <name>Flink : Test utils : Table Filesystem</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-files</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test dependencies -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-csv</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java
new file mode 100644
index 00000000000..5d0e9ace77f
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.DynamicParallelismInference;
+import org.apache.flink.connector.file.src.assigners.FileSplitAssigner;
+import 
org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner;
+import 
org.apache.flink.connector.file.src.enumerate.BlockSplittingRecursiveEnumerator;
+import org.apache.flink.connector.file.src.enumerate.FileEnumerator;
+import 
org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumerator;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A unified data source that reads files - both in batch and in streaming 
mode. This is used only
+ * for test. Due to {@link FileSource} is a final class, so we can't extend it 
directly.
+ *
+ * @param <T> The type of the events/records produced by this source.
+ */
+@Internal
+public class TestFileSource<T> extends AbstractFileSource<T, FileSourceSplit>
+        implements DynamicParallelismInference {
+
+    private static final long serialVersionUID = 1L;
+
+    /** The default split assigner, a lazy locality-aware assigner. */
+    public static final FileSplitAssigner.Provider DEFAULT_SPLIT_ASSIGNER =
+            LocalityAwareSplitAssigner::new;
+
+    /**
+     * The default file enumerator used for splittable formats. The enumerator 
recursively
+     * enumerates files, split files that consist of multiple distributed 
storage blocks into
+     * multiple splits, and filters hidden files (files starting with '.' or 
'_'). Files with
+     * suffixes of common compression formats (for example '.gzip', '.bz2', 
'.xy', '.zip', ...) will
+     * not be split.
+     */
+    public static final FileEnumerator.Provider 
DEFAULT_SPLITTABLE_FILE_ENUMERATOR =
+            BlockSplittingRecursiveEnumerator::new;
+
+    /**
+     * The default file enumerator used for non-splittable formats. The 
enumerator recursively
+     * enumerates files, creates one split for the file, and filters hidden 
files (files starting
+     * with '.' or '_').
+     */
+    public static final FileEnumerator.Provider 
DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR =
+            NonSplittingRecursiveEnumerator::new;
+
+    private final boolean isStreamingMode;
+    private final ContinuousEnumerationSettings continuousEnumerationSettings;
+
+    // ------------------------------------------------------------------------
+
+    private TestFileSource(
+            final Path[] inputPaths,
+            final FileEnumerator.Provider fileEnumerator,
+            final FileSplitAssigner.Provider splitAssigner,
+            final BulkFormat<T, FileSourceSplit> readerFormat,
+            final boolean isStreamingMode,
+            @Nullable final ContinuousEnumerationSettings 
continuousEnumerationSettings) {
+
+        super(
+                inputPaths,
+                fileEnumerator,
+                splitAssigner,
+                readerFormat,
+                continuousEnumerationSettings);
+        this.isStreamingMode = isStreamingMode;
+        this.continuousEnumerationSettings = continuousEnumerationSettings;
+    }
+
+    @Override
+    public SimpleVersionedSerializer<FileSourceSplit> getSplitSerializer() {
+        return FileSourceSplitSerializer.INSTANCE;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return isStreamingMode && continuousEnumerationSettings != null
+                ? Boundedness.CONTINUOUS_UNBOUNDED
+                : Boundedness.BOUNDED;
+    }
+
+    @Override
+    public int inferParallelism(Context dynamicParallelismContext) {
+        FileEnumerator fileEnumerator = getEnumeratorFactory().create();
+
+        Collection<FileSourceSplit> splits;
+        try {
+            splits =
+                    fileEnumerator.enumerateSplits(
+                            inputPaths,
+                            
dynamicParallelismContext.getParallelismInferenceUpperBound());
+        } catch (IOException e) {
+            throw new FlinkRuntimeException("Could not enumerate file splits", 
e);
+        }
+
+        return Math.min(
+                splits.size(), 
dynamicParallelismContext.getParallelismInferenceUpperBound());
+    }
+
+    // ------------------------------------------------------------------------
+    //  Entry-point Factory Methods
+    // ------------------------------------------------------------------------
+    /**
+     * Builds a new {@code FileSource} using a {@link BulkFormat} to read 
batches of records from
+     * files.
+     *
+     * <p>Examples for bulk readers are compressed and vectorized formats such 
as ORC or Parquet.
+     */
+    public static <T> TestFileSource.TestFileSourceBuilder<T> 
forBulkFileFormat(
+            final BulkFormat<T, FileSourceSplit> bulkFormat, final Path... 
paths) {
+        Preconditions.checkNotNull(bulkFormat, "reader");
+        Preconditions.checkNotNull(paths, "paths");
+        Preconditions.checkArgument(paths.length > 0, "paths must not be 
empty");
+
+        return new TestFileSource.TestFileSourceBuilder<>(paths, bulkFormat);
+    }
+
+    // ------------------------------------------------------------------------
+    //  Builder
+    // ------------------------------------------------------------------------
+
+    /**
+     * The builder for the {@code FileSource}, to configure the various 
behaviors.
+     *
+     * <p>Start building the source via one of the following methods:
+     *
+     * <ul>
+     *   <li>{@link TestFileSource#forBulkFileFormat(BulkFormat, Path...)}
+     * </ul>
+     */
+    public static final class TestFileSourceBuilder<T>
+            extends AbstractFileSourceBuilder<T, FileSourceSplit, 
FileSource.FileSourceBuilder<T>> {
+
+        private boolean isStreamingMode = false;
+
+        TestFileSourceBuilder(Path[] inputPaths, BulkFormat<T, 
FileSourceSplit> readerFormat) {
+            super(
+                    inputPaths,
+                    readerFormat,
+                    readerFormat.isSplittable()
+                            ? DEFAULT_SPLITTABLE_FILE_ENUMERATOR
+                            : DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR,
+                    DEFAULT_SPLIT_ASSIGNER);
+        }
+
+        public TestFileSourceBuilder<T> setStreamingMode(boolean 
streamingMode) {
+            this.isStreamingMode = streamingMode;
+            return this;
+        }
+
+        @Override
+        public TestFileSource<T> build() {
+            return new TestFileSource<>(
+                    inputPaths,
+                    fileEnumerator,
+                    splitAssigner,
+                    readerFormat,
+                    isStreamingMode,
+                    continuousSourceSettings);
+        }
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java
new file mode 100644
index 00000000000..3158f03473c
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.TestFileSource;
+import 
org.apache.flink.connector.file.src.enumerate.BlockSplittingRecursiveAllDirEnumerator;
+import 
org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveAllDirEnumerator;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.source.SourceProvider;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.DataType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** Test file system table source. */
+@Internal
+public class TestFileSystemTableSource extends FileSystemTableSource {
+
+    public TestFileSystemTableSource(
+            ObjectIdentifier tableIdentifier,
+            DataType physicalRowDataType,
+            List<String> partitionKeys,
+            ReadableConfig tableOptions,
+            @Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> 
bulkReaderFormat,
+            @Nullable DecodingFormat<DeserializationSchema<RowData>> 
deserializationFormat) {
+        super(
+                tableIdentifier,
+                physicalRowDataType,
+                partitionKeys,
+                tableOptions,
+                bulkReaderFormat,
+                deserializationFormat);
+    }
+
+    @Override
+    protected SourceProvider createSourceProvider(BulkFormat<RowData, 
FileSourceSplit> bulkFormat) {
+        final TestFileSource.TestFileSourceBuilder<RowData> fileSourceBuilder =
+                TestFileSource.forBulkFileFormat(bulkFormat, paths());
+
+        tableOptions
+                
.getOptional(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL)
+                .ifPresent(fileSourceBuilder::monitorContinuously);
+        tableOptions
+                
.getOptional(FileSystemConnectorOptions.SOURCE_PATH_REGEX_PATTERN)
+                .ifPresent(
+                        regex ->
+                                fileSourceBuilder.setFileEnumerator(
+                                        bulkFormat.isSplittable()
+                                                ? () ->
+                                                        new 
BlockSplittingRecursiveAllDirEnumerator(
+                                                                regex)
+                                                : () ->
+                                                        new 
NonSplittingRecursiveAllDirEnumerator(
+                                                                regex)));
+
+        boolean isStreamingMode =
+                tableOptions.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
+        fileSourceBuilder.setStreamingMode(isStreamingMode);
+
+        return SourceProvider.of(fileSourceBuilder.build());
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
new file mode 100644
index 00000000000..aa5cd5e17bb
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.file.testutils;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.file.table.FileSystemTableFactory;
+import org.apache.flink.connector.file.table.TestFileSystemTableSource;
+import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+/** Test filesystem {@link Factory}. */
+@Internal
+public class TestFileSystemTableFactory extends FileSystemTableFactory {
+
+    public static final String IDENTIFIER = "test-filesystem";
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public DynamicTableSource createDynamicTableSource(Context context) {
+        FactoryUtil.TableFactoryHelper helper = 
FactoryUtil.createTableFactoryHelper(this, context);
+        validate(helper);
+
+        return new TestFileSystemTableSource(
+                context.getObjectIdentifier(),
+                context.getPhysicalRowDataType(),
+                context.getCatalogTable().getPartitionKeys(),
+                helper.getOptions(),
+                discoverDecodingFormat(context, BulkReaderFormatFactory.class),
+                discoverDecodingFormat(context, 
DeserializationFormatFactory.class));
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/JsonSerdeUtil.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/JsonSerdeUtil.java
new file mode 100644
index 00000000000..755ecfd897b
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/JsonSerdeUtil.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.file.testutils.catalog;
+
+import org.apache.flink.util.jackson.JacksonMapperFactory;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+
+public class JsonSerdeUtil {
+
+    /**
+     * Object mapper shared instance to serialize and deserialize the plan. 
Note that creating and
+     * copying of object mappers is expensive and should be avoided.
+     */
+    private static final ObjectMapper OBJECT_MAPPER_INSTANCE;
+
+    static {
+        OBJECT_MAPPER_INSTANCE =
+                JacksonMapperFactory.createObjectMapper()
+                        .configure(
+                                
JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(),
+                                true);
+    }
+
+    public static <T> T fromJson(String json, Class<T> clazz) {
+        try {
+            return OBJECT_MAPPER_INSTANCE.reader().readValue(json, clazz);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    public static <T> String toJson(T t) {
+        try {
+            return 
OBJECT_MAPPER_INSTANCE.writerWithDefaultPrettyPrinter().writeValueAsString(t);
+        } catch (JsonProcessingException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java
new file mode 100644
index 00000000000..490dd29d608
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.file.testutils.catalog;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.AbstractCatalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogFunction;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.CatalogPartition;
+import org.apache.flink.table.catalog.CatalogPartitionSpec;
+import org.apache.flink.table.catalog.CatalogPropertiesUtil;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogView;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogBaseTable;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.FunctionNotExistException;
+import 
org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException;
+import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
+import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
+import org.apache.flink.table.catalog.exceptions.TablePartitionedException;
+import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
+import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.file.testutils.TestFileSystemTableFactory;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.StringUtils;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/** A catalog implementation for test {@link FileSystem}. */
+public class TestFileSystemCatalog extends AbstractCatalog {
+
+    public static final String SCHEMA_PATH = "schema";
+    public static final String DATA_PATH = "data";
+    private static final String SCHEMA_FILE_EXTENSION = ".json";
+
+    private final String catalogPathStr;
+
+    private Path catalogPath;
+    private FileSystem fs;
+
+    public TestFileSystemCatalog(String pathStr, String name, String 
defaultDatabase) {
+        super(name, defaultDatabase);
+        this.catalogPathStr = pathStr;
+    }
+
+    @VisibleForTesting
+    public String getCatalogPathStr() {
+        return catalogPathStr;
+    }
+
+    @Override
+    public Optional<Factory> getFactory() {
+        return Optional.of(new TestFileSystemTableFactory());
+    }
+
+    @Override
+    public void open() throws CatalogException {
+        try {
+            catalogPath = new Path(catalogPathStr);
+            fs = catalogPath.getFileSystem();
+            if (!fs.exists(catalogPath)) {
+                throw new CatalogException(
+                        String.format(
+                                "Catalog %s path %s does not exist.", 
getName(), catalogPath));
+            }
+            if (!fs.getFileStatus(catalogPath).isDir()) {
+                throw new CatalogException(
+                        String.format(
+                                "Failed to open catalog path. The given path 
%s is not a directory.",
+                                catalogPath));
+            }
+        } catch (IOException e) {
+            throw new CatalogException(
+                    String.format("Checking catalog path %s exists occur 
exception.", catalogPath),
+                    e);
+        }
+    }
+
+    @Override
+    public void close() throws CatalogException {}
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        try {
+            FileStatus[] fileStatuses = fs.listStatus(catalogPath);
+            return Arrays.stream(fileStatuses)
+                    .filter(FileStatus::isDir)
+                    .map(fileStatus -> fileStatus.getPath().getName())
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            throw new CatalogException("Listing database occur exception.", e);
+        }
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        if (databaseExists(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public boolean databaseExists(String databaseName) throws CatalogException 
{
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(databaseName),
+                "The database name cannot be null or empty.");
+
+        return listDatabases().contains(databaseName);
+    }
+
+    @Override
+    public void createDatabase(String name, CatalogDatabase database, boolean 
ignoreIfExists)
+            throws DatabaseAlreadyExistException, CatalogException {
+        if (databaseExists(name)) {
+            if (ignoreIfExists) {
+                return;
+            } else {
+                throw new DatabaseAlreadyExistException(getName(), name);
+            }
+        }
+
+        if (!CollectionUtil.isNullOrEmpty(database.getProperties())) {
+            throw new CatalogException(
+                    "TestFilesystem catalog doesn't support to create database 
with options.");
+        }
+
+        Path dbPath = new Path(catalogPath, name);
+        try {
+            fs.mkdirs(dbPath);
+        } catch (IOException e) {
+            throw new CatalogException(
+                    String.format("Creating database %s occur exception.", 
name), e);
+        }
+    }
+
+    @Override
+    public void dropDatabase(String databaseName, boolean ignoreIfNotExists, 
boolean cascade)
+            throws DatabaseNotExistException, DatabaseNotEmptyException, 
CatalogException {
+        if (!databaseExists(databaseName)) {
+            if (ignoreIfNotExists) {
+                return;
+            } else {
+                throw new DatabaseNotExistException(getName(), databaseName);
+            }
+        }
+
+        List<String> tables = listTables(databaseName);
+        if (!tables.isEmpty() && !cascade) {
+            throw new DatabaseNotEmptyException(getName(), databaseName);
+        }
+
+        if (databaseName.equals(getDefaultDatabase())) {
+            throw new IllegalArgumentException(
+                    "TestFilesystem catalog doesn't support to drop the 
default database.");
+        }
+
+        Path dbPath = new Path(catalogPath, databaseName);
+        try {
+            fs.delete(dbPath, true);
+        } catch (IOException e) {
+            throw new CatalogException(
+                    String.format("Dropping database %s occur exception.", 
databaseName), e);
+        }
+    }
+
+    @Override
+    public void alterDatabase(String name, CatalogDatabase newDatabase, 
boolean ignoreIfNotExists)
+            throws DatabaseNotExistException, CatalogException {
+        throw new UnsupportedOperationException("alterDatabase is not 
implemented.");
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+
+        Path dbPath = new Path(catalogPath, databaseName);
+        try {
+            return Arrays.stream(fs.listStatus(dbPath))
+                    .filter(FileStatus::isDir)
+                    .map(fileStatus -> fileStatus.getPath().getName())
+                    .collect(Collectors.toList());
+        } catch (IOException e) {
+            throw new CatalogException(
+                    String.format("Listing table in database %s occur 
exception.", dbPath), e);
+        }
+    }
+
+    @Override
+    public List<String> listViews(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+
+        final Path tableSchemaPath =
+                tableSchemaFilePath(
+                        inferTableSchemaPath(catalogPathStr, tablePath), 
tablePath.getObjectName());
+        final Path tableDataPath = inferTableDataPath(catalogPathStr, 
tablePath);
+        try {
+            FileSystemTableInfo tableInfo =
+                    JsonSerdeUtil.fromJson(
+                            readFileUtf8(tableSchemaPath), 
FileSystemTableInfo.class);
+            return deserializeTable(
+                    tableInfo.getTableKind(),
+                    tableInfo.getCatalogTableInfo(),
+                    tableDataPath.getPath());
+        } catch (IOException e) {
+            throw new CatalogException(
+                    String.format("Getting table %s occur exception.", 
tablePath), e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        Path path = inferTablePath(catalogPathStr, tablePath);
+        Path tableSchemaFilePath =
+                tableSchemaFilePath(
+                        inferTableSchemaPath(catalogPathStr, tablePath), 
tablePath.getObjectName());
+        try {
+            return fs.exists(path) && fs.exists(tableSchemaFilePath);
+        } catch (IOException e) {
+            throw new CatalogException(
+                    String.format("Checking table %s exists occur exception.", 
tablePath), e);
+        }
+    }
+
+    @Override
+    public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            if (ignoreIfNotExists) {
+                return;
+            } else {
+                throw new TableNotExistException(getName(), tablePath);
+            }
+        }
+
+        Path path = inferTablePath(catalogPathStr, tablePath);
+        try {
+            fs.delete(path, true);
+        } catch (IOException e) {
+            throw new CatalogException(
+                    String.format("Dropping table %s occur exception.", 
tablePath), e);
+        }
+    }
+
+    @Override
+    public void renameTable(ObjectPath tablePath, String newTableName, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, TableAlreadyExistException, 
CatalogException {
+        throw new UnsupportedOperationException("renameTable is not 
implemented.");
+    }
+
+    @Override
+    public void createTable(
+            ObjectPath tablePath, CatalogBaseTable catalogTable, boolean 
ignoreIfExists)
+            throws TableAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        if (!databaseExists(tablePath.getDatabaseName())) {
+            throw new DatabaseNotExistException(getName(), 
tablePath.getDatabaseName());
+        }
+        if (tableExists(tablePath)) {
+            if (ignoreIfExists) {
+                return;
+            } else {
+                throw new TableAlreadyExistException(getName(), tablePath);
+            }
+        }
+
+        if (catalogTable instanceof CatalogView) {
+            throw new UnsupportedOperationException(
+                    "TestFilesystem catalog doesn't support to CREATE VIEW.");
+        }
+
+        Tuple4<Path, Path, Path, String> tableSchemaTuple =
+                getTableJsonInfo(tablePath, catalogTable);
+        Path path = tableSchemaTuple.f0;
+        Path tableSchemaPath = tableSchemaTuple.f1;
+        Path tableDataPath = tableSchemaTuple.f2;
+        String jsonSchema = tableSchemaTuple.f3;
+        try {
+            if (!fs.exists(path)) {
+                fs.mkdirs(path);
+                fs.mkdirs(tableSchemaPath);
+                fs.mkdirs(tableDataPath);
+            }
+
+            // write table schema
+            Path tableSchemaFilePath =
+                    tableSchemaFilePath(tableSchemaPath, 
tablePath.getObjectName());
+            try (FSDataOutputStream os =
+                    fs.create(tableSchemaFilePath, 
FileSystem.WriteMode.NO_OVERWRITE)) {
+                os.write(jsonSchema.getBytes(StandardCharsets.UTF_8));
+            }
+
+        } catch (IOException e) {
+            throw new CatalogException(
+                    String.format("Create table %s occur exception.", 
tablePath), e);
+        }
+    }
+
+    @Override
+    public void alterTable(
+            ObjectPath tablePath, CatalogBaseTable newTable, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        throw new UnsupportedOperationException("alterTable is not 
implemented");
+    }
+
+    @Override
+    public void alterTable(
+            ObjectPath tablePath,
+            CatalogBaseTable newTable,
+            List<TableChange> tableChanges,
+            boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        if (ignoreIfNotExists && !tableExists(tablePath)) {
+            return;
+        }
+
+        Tuple4<Path, Path, Path, String> tableSchemaInfo = 
getTableJsonInfo(tablePath, newTable);
+        Path tableSchemaPath = tableSchemaInfo.f1;
+        String jsonSchema = tableSchemaInfo.f3;
+        try {
+            if (!fs.exists(tableSchemaPath)) {
+                throw new CatalogException(
+                        String.format(
+                                "Table %s schema file %s doesn't exists.",
+                                tablePath, tableSchemaPath));
+            }
+            // write new table schema
+            Path tableSchemaFilePath =
+                    tableSchemaFilePath(tableSchemaPath, 
tablePath.getObjectName());
+            try (FSDataOutputStream os =
+                    fs.create(tableSchemaFilePath, 
FileSystem.WriteMode.OVERWRITE)) {
+                os.write(jsonSchema.getBytes(StandardCharsets.UTF_8));
+            }
+
+        } catch (IOException e) {
+            throw new CatalogException(
+                    String.format("Altering table %s occur exception.", 
tablePath), e);
+        }
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath)
+            throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitions(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws TableNotExistException, TableNotPartitionedException,
+                    PartitionSpecInvalidException, CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<CatalogPartitionSpec> listPartitionsByFilter(
+            ObjectPath tablePath, List<Expression> filters)
+            throws TableNotExistException, TableNotPartitionedException, 
CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public CatalogPartition getPartition(ObjectPath tablePath, 
CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException, CatalogException {
+        throw new PartitionNotExistException(getName(), tablePath, 
partitionSpec);
+    }
+
+    @Override
+    public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec 
partitionSpec)
+            throws CatalogException {
+        return false;
+    }
+
+    @Override
+    public void createPartition(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogPartition partition,
+            boolean ignoreIfExists)
+            throws TableNotExistException, TableNotPartitionedException,
+                    PartitionSpecInvalidException, 
PartitionAlreadyExistsException,
+                    CatalogException {
+        throw new UnsupportedOperationException("createPartition is not 
implemented.");
+    }
+
+    @Override
+    public void dropPartition(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean 
ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        throw new UnsupportedOperationException("dropPartition is not 
implemented.");
+    }
+
+    @Override
+    public void alterPartition(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogPartition newPartition,
+            boolean ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        throw new UnsupportedOperationException("alterPartition is not 
implemented.");
+    }
+
+    @Override
+    public List<String> listFunctions(String dbName)
+            throws DatabaseNotExistException, CatalogException {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public CatalogFunction getFunction(ObjectPath functionPath)
+            throws FunctionNotExistException, CatalogException {
+        throw new FunctionNotExistException(getName(), functionPath);
+    }
+
+    @Override
+    public boolean functionExists(ObjectPath functionPath) throws 
CatalogException {
+        return false;
+    }
+
+    @Override
+    public void createFunction(
+            ObjectPath functionPath, CatalogFunction function, boolean 
ignoreIfExists)
+            throws FunctionAlreadyExistException, DatabaseNotExistException, 
CatalogException {
+        throw new UnsupportedOperationException("createFunction is not 
implemented.");
+    }
+
+    @Override
+    public void alterFunction(
+            ObjectPath functionPath, CatalogFunction newFunction, boolean 
ignoreIfNotExists)
+            throws FunctionNotExistException, CatalogException {
+        throw new UnsupportedOperationException("alterFunction is not 
implemented.");
+    }
+
+    @Override
+    public void dropFunction(ObjectPath functionPath, boolean 
ignoreIfNotExists)
+            throws FunctionNotExistException, CatalogException {
+        throw new UnsupportedOperationException("dropFunction is not 
implemented.");
+    }
+
+    @Override
+    public CatalogTableStatistics getTableStatistics(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogColumnStatistics getTableColumnStatistics(ObjectPath 
tablePath)
+            throws TableNotExistException, CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogTableStatistics getPartitionStatistics(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException, CatalogException {
+        return CatalogTableStatistics.UNKNOWN;
+    }
+
+    @Override
+    public CatalogColumnStatistics getPartitionColumnStatistics(
+            ObjectPath tablePath, CatalogPartitionSpec partitionSpec)
+            throws PartitionNotExistException, CatalogException {
+        return CatalogColumnStatistics.UNKNOWN;
+    }
+
+    @Override
+    public void alterTableStatistics(
+            ObjectPath tablePath, CatalogTableStatistics tableStatistics, 
boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        throw new UnsupportedOperationException("alterTableStatistics is not 
implemented.");
+    }
+
+    @Override
+    public void alterTableColumnStatistics(
+            ObjectPath tablePath,
+            CatalogColumnStatistics columnStatistics,
+            boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException, 
TablePartitionedException {
+        throw new UnsupportedOperationException("alterTableColumnStatistics is 
not implemented.");
+    }
+
+    @Override
+    public void alterPartitionStatistics(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogTableStatistics partitionStatistics,
+            boolean ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        throw new UnsupportedOperationException("alterPartitionStatistics is 
not implemented.");
+    }
+
+    @Override
+    public void alterPartitionColumnStatistics(
+            ObjectPath tablePath,
+            CatalogPartitionSpec partitionSpec,
+            CatalogColumnStatistics columnStatistics,
+            boolean ignoreIfNotExists)
+            throws PartitionNotExistException, CatalogException {
+        throw new UnsupportedOperationException(
+                "alterPartitionColumnStatistics is not implemented.");
+    }
+
+    private Tuple4<Path, Path, Path, String> getTableJsonInfo(
+            ObjectPath tablePath, CatalogBaseTable catalogTable) {
+        final Path path = inferTablePath(catalogPathStr, tablePath);
+        final Path tableSchemaPath = inferTableSchemaPath(catalogPathStr, 
tablePath);
+        final Path tableDataPathStr = inferTableDataPath(catalogPathStr, 
tablePath);
+
+        ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
+                (ResolvedCatalogBaseTable<?>) catalogTable;
+        Map<String, String> catalogTableInfo = 
serializeTable(resolvedCatalogBaseTable);
+        CatalogBaseTable.TableKind tableKind = catalogTable.getTableKind();
+        FileSystemTableInfo tableInfo = new FileSystemTableInfo(tableKind, 
catalogTableInfo);
+
+        String jsonSchema = JsonSerdeUtil.toJson(tableInfo);
+        return Tuple4.of(path, tableSchemaPath, tableDataPathStr, jsonSchema);
+    }
+
+    /** Read file to UTF_8 decoding. */
+    private String readFileUtf8(Path path) throws IOException {
+        try (FSDataInputStream in = path.getFileSystem().open(path)) {
+            BufferedReader reader =
+                    new BufferedReader(new InputStreamReader(in, 
StandardCharsets.UTF_8));
+            StringBuilder builder = new StringBuilder();
+            String line;
+            while ((line = reader.readLine()) != null) {
+                builder.append(line);
+            }
+            return builder.toString();
+        }
+    }
+
+    private Path inferTablePath(String catalogPath, ObjectPath tablePath) {
+        return new Path(
+                String.format(
+                        "%s/%s/%s",
+                        catalogPath, tablePath.getDatabaseName(), 
tablePath.getObjectName()));
+    }
+
+    private Path inferTableDataPath(String catalogPath, ObjectPath tablePath) {
+        return new Path(
+                String.format(
+                        "%s/%s/%s/%s",
+                        catalogPath,
+                        tablePath.getDatabaseName(),
+                        tablePath.getObjectName(),
+                        DATA_PATH));
+    }
+
+    private Path inferTableSchemaPath(String catalogPath, ObjectPath 
tablePath) {
+        return new Path(
+                String.format(
+                        "%s/%s/%s/%s",
+                        catalogPath,
+                        tablePath.getDatabaseName(),
+                        tablePath.getObjectName(),
+                        SCHEMA_PATH));
+    }
+
+    private Path tableSchemaFilePath(Path tableSchemaPath, String tableName) {
+        return new Path(tableSchemaPath, tableName + "_schema" + 
SCHEMA_FILE_EXTENSION);
+    }
+
+    private Map<String, String> serializeTable(
+            ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable) {
+        if (resolvedCatalogBaseTable instanceof ResolvedCatalogTable) {
+            return CatalogPropertiesUtil.serializeCatalogTable(
+                    (ResolvedCatalogTable) resolvedCatalogBaseTable);
+        } else if (resolvedCatalogBaseTable instanceof 
ResolvedCatalogMaterializedTable) {
+            return CatalogPropertiesUtil.serializeCatalogMaterializedTable(
+                    (ResolvedCatalogMaterializedTable) 
resolvedCatalogBaseTable);
+        }
+
+        throw new IllegalArgumentException(
+                "Unknown kind of resolved catalog base table: "
+                        + resolvedCatalogBaseTable.getClass());
+    }
+
+    private CatalogBaseTable deserializeTable(
+            CatalogBaseTable.TableKind tableKind,
+            Map<String, String> properties,
+            String tableDataPath) {
+        if (CatalogBaseTable.TableKind.TABLE == tableKind) {
+            CatalogTable catalogTable = 
CatalogPropertiesUtil.deserializeCatalogTable(properties);
+            // put table data path
+            Map<String, String> options = new 
HashMap<>(catalogTable.getOptions());
+            options.put(FileSystemConnectorOptions.PATH.key(), tableDataPath);
+            return catalogTable.copy(options);
+        } else if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE == tableKind) 
{
+            CatalogMaterializedTable catalogMaterializedTable =
+                    
CatalogPropertiesUtil.deserializeCatalogMaterializedTable(properties);
+            // put table data path
+            Map<String, String> options = new 
HashMap<>(catalogMaterializedTable.getOptions());
+            options.put(FileSystemConnectorOptions.PATH.key(), tableDataPath);
+            return catalogMaterializedTable.copy(options);
+        }
+
+        throw new IllegalArgumentException("Unknown catalog base table kind: " 
+ tableKind);
+    }
+
+    /** The pojo class represents serializable catalog base table info. */
+    public static class FileSystemTableInfo {
+
+        private final CatalogBaseTable.TableKind tableKind;
+        private final Map<String, String> catalogTableInfo;
+
+        @JsonCreator
+        public FileSystemTableInfo(
+                @JsonProperty("tableKind") CatalogBaseTable.TableKind 
tableKind,
+                @JsonProperty("catalogTableInfo") Map<String, String> 
catalogTableInfo) {
+            this.tableKind = tableKind;
+            this.catalogTableInfo = catalogTableInfo;
+        }
+
+        public CatalogBaseTable.TableKind getTableKind() {
+            return tableKind;
+        }
+
+        public Map<String, String> getCatalogTableInfo() {
+            return catalogTableInfo;
+        }
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java
new file mode 100644
index 00000000000..41999468d52
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.file.testutils.catalog;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static 
org.apache.flink.table.file.testutils.TestFileSystemTableFactory.IDENTIFIER;
+
+/** Factory for {@link TestFileSystemCatalog}. */
+public class TestFileSystemCatalogFactory implements CatalogFactory {
+
+    public static final ConfigOption<String> PATH =
+            ConfigOptions.key("path")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Catalog base DFS path, used for inferring the 
sink table path. "
+                                    + "The default strategy for a table path 
is: ${catalog.path}/${db_name}/${table_name}");
+
+    public static final ConfigOption<String> DEFAULT_DATABASE =
+            ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY)
+                    .stringType()
+                    .defaultValue("default");
+
+    @Override
+    public String factoryIdentifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Catalog createCatalog(Context context) {
+        final FactoryUtil.CatalogFactoryHelper helper =
+                FactoryUtil.createCatalogFactoryHelper(this, context);
+        helper.validate();
+        return new TestFileSystemCatalog(
+                helper.getOptions().get(PATH),
+                context.getName(),
+                helper.getOptions().get(DEFAULT_DATABASE));
+    }
+
+    @Override
+    public Set<ConfigOption<?>> requiredOptions() {
+        Set<ConfigOption<?>> set = new HashSet<>();
+        set.add(PATH);
+        set.add(DEFAULT_DATABASE);
+        return set;
+    }
+
+    @Override
+    public Set<ConfigOption<?>> optionalOptions() {
+        return Collections.emptySet();
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
new file mode 100644
index 00000000000..7977248c5be
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalogFactory
+org.apache.flink.table.file.testutils.TestFileSystemTableFactory
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java
new file mode 100644
index 00000000000..b02a7f124ad
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.file.testutils;
+
+import org.apache.flink.connector.file.table.FileSystemTableSink;
+import org.apache.flink.connector.file.table.TestFileSystemTableSource;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TestFileSystemTableFactory}. */
+public class TestFileSystemTableFactoryTest {
+
+    private static final ResolvedSchema SCHEMA =
+            ResolvedSchema.of(
+                    Column.physical("f0", DataTypes.STRING()),
+                    Column.physical("f1", DataTypes.BIGINT()),
+                    Column.physical("f2", DataTypes.BIGINT()));
+
+    @Test
+    void testCreateSourceSink() {
+        Map<String, String> options = new HashMap<>();
+        options.put(FactoryUtil.CONNECTOR.key(), "test-filesystem");
+        options.put("path", "/tmp");
+        options.put("format", "testcsv");
+
+        // test ignore format options
+        options.put("testcsv.my_option", "my_value");
+
+        DynamicTableSource source = createTableSource(SCHEMA, options);
+        assertThat(source).isInstanceOf(TestFileSystemTableSource.class);
+
+        DynamicTableSink sink = createTableSink(SCHEMA, options);
+        assertThat(sink).isInstanceOf(FileSystemTableSink.class);
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactoryTest.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactoryTest.java
new file mode 100644
index 00000000000..4b77a979f93
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactoryTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.file.testutils.catalog;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link TestFileSystemCatalog} created by {@link 
TestFileSystemCatalogFactory}. */
+public class TestFileSystemCatalogFactoryTest {
+
+    private static final String TEST_CATALOG = "test_catalog";
+    private static final String TEST_DEFAULT_DATABASE = "test_db";
+
+    @TempDir File tempFile;
+
+    @BeforeEach
+    void before() {
+        File testDb = new File(tempFile, TEST_DEFAULT_DATABASE);
+        testDb.mkdir();
+    }
+
+    @Test
+    public void testCreateCatalogWithDDL() {
+        final TableEnvironment tEnv =
+                TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        String catalogDDL =
+                String.format(
+                        "CREATE CATALOG %s\n"
+                                + "WITH (\n"
+                                + "  'type' = 'test-filesystem',\n"
+                                + "  'path' = '%s',\n"
+                                + "  'default-database' = '%s'\n"
+                                + "  )",
+                        TEST_CATALOG, tempFile.getAbsolutePath(), 
TEST_DEFAULT_DATABASE);
+
+        tEnv.executeSql(catalogDDL);
+
+        List<Row> result =
+                CollectionUtil.iteratorToList(tEnv.executeSql("SHOW 
CATALOGS").collect());
+        assertThat(result.contains(Row.of(TEST_CATALOG))).isTrue();
+    }
+
+    @Test
+    public void testCreateCatalogWithFactory() {
+        final Map<String, String> properties = new HashMap<>();
+        properties.put("type", "test-filesystem");
+        properties.put("default-database", TEST_DEFAULT_DATABASE);
+        properties.put("path", tempFile.getAbsolutePath());
+        final TestFileSystemCatalog actualCatalog =
+                (TestFileSystemCatalog)
+                        FactoryUtil.createCatalog(
+                                TEST_CATALOG,
+                                properties,
+                                new Configuration(),
+                                
TestFileSystemCatalogFactoryTest.class.getClassLoader());
+
+        
assertThat(actualCatalog.getCatalogPathStr()).isEqualTo(tempFile.getAbsolutePath());
+        assertThat(actualCatalog.getName()).isEqualTo(TEST_CATALOG);
+        
assertThat(actualCatalog.getDefaultDatabase()).isEqualTo(TEST_DEFAULT_DATABASE);
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java
new file mode 100644
index 00000000000..e418f6c80a1
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.file.testutils.catalog;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration tests for {@link TestFileSystemCatalog}. */
+public class TestFileSystemCatalogITCase extends TestFileSystemCatalogTestBase 
{
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testReadAndWriteTestFileSystemTable(boolean isStreamingMode) 
throws Exception {
+        TableEnvironment tEnv =
+                TableEnvironment.create(
+                        isStreamingMode
+                                ? EnvironmentSettings.inStreamingMode()
+                                : EnvironmentSettings.inBatchMode());
+
+        tEnv.registerCatalog(TEST_CATALOG, catalog);
+        tEnv.useCatalog(TEST_CATALOG);
+
+        tEnv.executeSql(
+                "CREATE TABLE CsvTable (\n"
+                        + "  id BIGINT,\n"
+                        + "  user_name STRING,\n"
+                        + "  message STRING,\n"
+                        + "  log_ts STRING\n"
+                        + ") WITH (\n"
+                        + "  'format' = 'csv'\n"
+                        + ")");
+
+        tEnv.getConfig().getConfiguration().setString("parallelism.default", 
"1");
+        tEnv.executeSql(
+                        String.format(
+                                "INSERT INTO %s.%s.CsvTable VALUES\n"
+                                        + "(1001, 'user1', 'hello world', 
'2021-06-10 10:00:00'),\n"
+                                        + "(1002, 'user2', 'hi', '2021-06-10 
10:01:00'),\n"
+                                        + "(1003, 'user3', 'ciao', '2021-06-10 
10:02:00'),\n"
+                                        + "(1004, 'user4', '你好', '2021-06-10 
10:03:00')",
+                                TEST_CATALOG, TEST_DEFAULT_DATABASE))
+                .await();
+
+        CloseableIterator<Row> result =
+                tEnv.executeSql(
+                                String.format(
+                                        "SELECT * FROM %s.%s.CsvTable",
+                                        TEST_CATALOG, TEST_DEFAULT_DATABASE))
+                        .collect();
+
+        // assert query result
+        List<Row> expected =
+                Arrays.asList(
+                        Row.of(1001L, "user1", "hello world", "2021-06-10 
10:00:00"),
+                        Row.of(1002L, "user2", "hi", "2021-06-10 10:01:00"),
+                        Row.of(1003L, "user3", "ciao", "2021-06-10 10:02:00"),
+                        Row.of(1004L, "user4", "你好", "2021-06-10 10:03:00"));
+
+        assertThat(expected).isEqualTo((Lists.newArrayList(result)));
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
new file mode 100644
index 00000000000..07b11583d4f
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.file.testutils.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.TestSchemaResolver;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.refresh.RefreshHandler;
+
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.file.table.FileSystemConnectorOptions.PATH;
+import static 
org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog.DATA_PATH;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Test for {@link TestFileSystemCatalog}. */
+public class TestFileSystemCatalogTest extends TestFileSystemCatalogTestBase {
+
+    private static final List<Column> CREATE_COLUMNS =
+            Arrays.asList(
+                    Column.physical("id", DataTypes.BIGINT()),
+                    Column.physical("name", DataTypes.VARCHAR(20)),
+                    Column.physical("age", DataTypes.INT()),
+                    Column.physical("tss", DataTypes.TIMESTAMP(3)),
+                    Column.physical("partition", DataTypes.VARCHAR(10)));
+    private static final UniqueConstraint CONSTRAINTS =
+            UniqueConstraint.primaryKey("primary_constraint", 
Collections.singletonList("id"));
+    private static final List<String> PARTITION_KEYS = 
Collections.singletonList("partition");
+
+    private static final ResolvedSchema CREATE_RESOLVED_SCHEMA =
+            new ResolvedSchema(CREATE_COLUMNS, Collections.emptyList(), 
CONSTRAINTS);
+
+    private static final Schema CREATE_SCHEMA =
+            
Schema.newBuilder().fromResolvedSchema(CREATE_RESOLVED_SCHEMA).build();
+
+    private static final Map<String, String> EXPECTED_OPTIONS = new 
HashMap<>();
+
+    static {
+        EXPECTED_OPTIONS.put("source.monitor-interval", "5S");
+        EXPECTED_OPTIONS.put("auto-compaction", "true");
+    }
+
+    private static final ResolvedCatalogTable EXPECTED_CATALOG_TABLE =
+            new ResolvedCatalogTable(
+                    CatalogTable.newBuilder()
+                            .schema(CREATE_SCHEMA)
+                            .comment("test table")
+                            .partitionKeys(PARTITION_KEYS)
+                            .options(EXPECTED_OPTIONS)
+                            .build(),
+                    CREATE_RESOLVED_SCHEMA);
+
+    private static final String DEFINITION_QUERY = "SELECT id, region, county 
FROM T";
+    private static final Duration FRESHNESS = Duration.ofMinutes(3);
+    private static final ResolvedCatalogMaterializedTable 
EXPECTED_CATALOG_MATERIALIZED_TABLE =
+            new ResolvedCatalogMaterializedTable(
+                    CatalogMaterializedTable.newBuilder()
+                            .schema(CREATE_SCHEMA)
+                            .comment("test materialized table")
+                            .partitionKeys(PARTITION_KEYS)
+                            .options(EXPECTED_OPTIONS)
+                            .definitionQuery(DEFINITION_QUERY)
+                            .freshness(FRESHNESS)
+                            .logicalRefreshMode(
+                                    
CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC)
+                            
.refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS)
+                            
.refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING)
+                            .build(),
+                    CREATE_RESOLVED_SCHEMA);
+
+    private static final TestRefreshHandler REFRESH_HANDLER =
+            new TestRefreshHandler("jobID: xxx, clusterId: yyy");
+
+    @Test
+    public void testListDatabases() {
+        List<String> actual = catalog.listDatabases();
+        assertThat(actual.contains(TEST_DEFAULT_DATABASE)).isTrue();
+        assertThat(actual.contains(NONE_EXIST_DATABASE)).isFalse();
+    }
+
+    @Test
+    public void testDatabaseExists() {
+        assertThat(catalog.databaseExists(TEST_DEFAULT_DATABASE)).isTrue();
+        assertThat(catalog.databaseExists(NONE_EXIST_DATABASE)).isFalse();
+    }
+
+    @Test
+    public void testCreateAndDropDatabase() throws Exception {
+        CatalogDatabase expected = new 
CatalogDatabaseImpl(Collections.emptyMap(), null);
+        catalog.createDatabase("db1", expected, true);
+
+        CatalogDatabase actual = catalog.getDatabase("db1");
+        assertThat(catalog.listDatabases().contains("db1")).isTrue();
+        assertThat(actual.getProperties()).isEqualTo(expected.getProperties());
+
+        // create exist database
+        assertThrows(
+                DatabaseAlreadyExistException.class,
+                () -> catalog.createDatabase("db1", expected, false));
+
+        // drop exist database
+        catalog.dropDatabase("db1", true);
+        assertThat(catalog.listDatabases().contains("db1")).isFalse();
+
+        // drop non-exist database
+        assertThrows(
+                DatabaseNotExistException.class,
+                () -> catalog.dropDatabase(NONE_EXIST_DATABASE, false));
+    }
+
+    @Test
+    public void testCreateDatabaseWithOptions() {
+        Map<String, String> options = new HashMap<>();
+        options.put("k1", "v1");
+        options.put("k2", "v2");
+
+        assertThrows(
+                CatalogException.class,
+                () -> catalog.createDatabase("db1", new 
CatalogDatabaseImpl(options, null), true));
+    }
+
+    @Test
+    public void testCreateAndGetCatalogTable() throws Exception {
+        ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
+        // test create table
+        catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true);
+
+        // test table exist
+        assertThat(catalog.tableExists(tablePath)).isTrue();
+
+        Map<String, String> expectedOptions = new HashMap<>(EXPECTED_OPTIONS);
+        expectedOptions.put(
+                PATH.key(),
+                String.format(
+                        "%s/%s/%s/%s",
+                        tempFile.getAbsolutePath(),
+                        tablePath.getDatabaseName(),
+                        tablePath.getObjectName(),
+                        DATA_PATH));
+
+        // test get table
+        CatalogBaseTable actualTable = catalog.getTable(tablePath);
+        // validate table type
+        
assertThat(actualTable.getTableKind()).isEqualTo(CatalogBaseTable.TableKind.TABLE);
+
+        CatalogTable actualCatalogTable = (CatalogTable) actualTable;
+        // validate schema
+        assertThat(actualCatalogTable.getUnresolvedSchema().resolve(new 
TestSchemaResolver()))
+                .isEqualTo(CREATE_RESOLVED_SCHEMA);
+        // validate partition key
+        
assertThat(actualCatalogTable.getPartitionKeys()).isEqualTo(PARTITION_KEYS);
+        // validate options
+        assertThat(actualCatalogTable.getOptions()).isEqualTo(expectedOptions);
+
+        // test create exist table
+        assertThrows(
+                TableAlreadyExistException.class,
+                () -> catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, 
false));
+    }
+
+    @Test
+    public void testCreateAndGetCatalogMaterializedTable() throws Exception {
+        ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb2");
+        // test create materialized table
+        catalog.createTable(tablePath, EXPECTED_CATALOG_MATERIALIZED_TABLE, 
true);
+
+        // test materialized table exist
+        assertThat(catalog.tableExists(tablePath)).isTrue();
+
+        Map<String, String> expectedOptions = new HashMap<>(EXPECTED_OPTIONS);
+        expectedOptions.put(
+                PATH.key(),
+                String.format(
+                        "%s/%s/%s/%s",
+                        tempFile.getAbsolutePath(),
+                        tablePath.getDatabaseName(),
+                        tablePath.getObjectName(),
+                        DATA_PATH));
+
+        // test get materialized table
+        CatalogBaseTable actualTable = catalog.getTable(tablePath);
+        // validate table type
+        assertThat(actualTable.getTableKind())
+                .isEqualTo(CatalogBaseTable.TableKind.MATERIALIZED_TABLE);
+
+        CatalogMaterializedTable actualMaterializedTable = 
(CatalogMaterializedTable) actualTable;
+        // validate schema
+        assertThat(actualMaterializedTable.getUnresolvedSchema().resolve(new 
TestSchemaResolver()))
+                .isEqualTo(CREATE_RESOLVED_SCHEMA);
+        // validate partition key
+        
assertThat(actualMaterializedTable.getPartitionKeys()).isEqualTo(PARTITION_KEYS);
+        // validate options
+        
assertThat(actualMaterializedTable.getOptions()).isEqualTo(expectedOptions);
+        // validate definition query
+        
assertThat(actualMaterializedTable.getDefinitionQuery()).isEqualTo(DEFINITION_QUERY);
+        // validate freshness
+        
assertThat(actualMaterializedTable.getFreshness()).isEqualTo(FRESHNESS);
+        // validate logical refresh mode
+        assertThat(actualMaterializedTable.getLogicalRefreshMode())
+                
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
+        // validate refresh mode
+        assertThat(actualMaterializedTable.getRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS);
+        // validate refresh status
+        assertThat(actualMaterializedTable.getRefreshStatus())
+                
.isEqualTo(CatalogMaterializedTable.RefreshStatus.INITIALIZING);
+        // validate refresh handler
+        assertThat(actualMaterializedTable.getRefreshHandlerDescription())
+                .isEqualTo(Optional.empty());
+        
assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNull();
+
+        // test create exist materialized table
+        assertThrows(
+                TableAlreadyExistException.class,
+                () -> catalog.createTable(tablePath, 
EXPECTED_CATALOG_MATERIALIZED_TABLE, false));
+    }
+
+    @Test
+    public void testListTable() throws Exception {
+        ObjectPath tablePath1 = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
+        ObjectPath tablePath2 = new ObjectPath(TEST_DEFAULT_DATABASE, "tb2");
+
+        // create table
+        catalog.createTable(tablePath1, EXPECTED_CATALOG_TABLE, true);
+        catalog.createTable(tablePath2, EXPECTED_CATALOG_MATERIALIZED_TABLE, 
true);
+
+        // test list table
+        List<String> tables = catalog.listTables(TEST_DEFAULT_DATABASE);
+        assertThat(tables.contains(tablePath1.getObjectName())).isTrue();
+        assertThat(tables.contains(tablePath2.getObjectName())).isTrue();
+
+        // test list non-exist database table
+        assertThrows(
+                DatabaseNotExistException.class, () -> 
catalog.listTables(NONE_EXIST_DATABASE));
+    }
+
+    @Test
+    public void testAlterCatalogTable() throws Exception {
+        ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
+        // test create table
+        catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true);
+
+        // test table exist
+        assertThat(catalog.tableExists(tablePath)).isTrue();
+
+        // alter table options
+        Map<String, String> options = new HashMap<>();
+        options.put("auto-compaction", "true");
+        options.put("sink.parallelism", "5");
+        ResolvedCatalogTable updatedResolvedTable = 
EXPECTED_CATALOG_TABLE.copy(options);
+        catalog.alterTable(tablePath, updatedResolvedTable, 
Collections.emptyList(), false);
+
+        // test get table
+        CatalogBaseTable actualTable = catalog.getTable(tablePath);
+        // validate table type
+        
assertThat(actualTable.getTableKind()).isEqualTo(CatalogBaseTable.TableKind.TABLE);
+
+        // validate options
+        Map<String, String> expectedOptions = new HashMap<>(options);
+        expectedOptions.put(
+                PATH.key(),
+                String.format(
+                        "%s/%s/%s/%s",
+                        tempFile.getAbsolutePath(),
+                        tablePath.getDatabaseName(),
+                        tablePath.getObjectName(),
+                        DATA_PATH));
+        assertThat(actualTable.getOptions()).isEqualTo(expectedOptions);
+    }
+
+    @Test
+    public void testAlterCatalogMaterializedTable() throws Exception {
+        ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb2");
+        // test create materialized table
+        catalog.createTable(tablePath, EXPECTED_CATALOG_MATERIALIZED_TABLE, 
true);
+
+        // test materialized table exist
+        assertThat(catalog.tableExists(tablePath)).isTrue();
+
+        // alter materialized table refresh handler
+        ResolvedCatalogMaterializedTable updatedMaterializedTable =
+                EXPECTED_CATALOG_MATERIALIZED_TABLE.copy(
+                        CatalogMaterializedTable.RefreshStatus.ACTIVATED,
+                        REFRESH_HANDLER.asSummaryString(),
+                        REFRESH_HANDLER.toBytes());
+        catalog.alterTable(tablePath, updatedMaterializedTable, 
Collections.emptyList(), false);
+
+        // test get materialized table
+        CatalogBaseTable actualTable = catalog.getTable(tablePath);
+        // validate table type
+        assertThat(actualTable.getTableKind())
+                .isEqualTo(CatalogBaseTable.TableKind.MATERIALIZED_TABLE);
+
+        CatalogMaterializedTable actualMaterializedTable = 
(CatalogMaterializedTable) actualTable;
+        // validate refresh status
+        assertThat(actualMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+        // validate refresh handler
+        
assertThat(actualMaterializedTable.getRefreshHandlerDescription().get())
+                .isEqualTo(REFRESH_HANDLER.asSummaryString());
+        assertThat(actualMaterializedTable.getSerializedRefreshHandler())
+                .isEqualTo(REFRESH_HANDLER.toBytes());
+    }
+
+    @Test
+    public void testDropTable() throws Exception {
+        ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1");
+        // create table
+        catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true);
+
+        // test drop table
+        catalog.dropTable(tablePath, true);
+        assertThat(catalog.tableExists(tablePath)).isFalse();
+
+        // drop non-exist table
+        assertThrows(
+                TableNotExistException.class,
+                () -> catalog.dropTable(new ObjectPath(TEST_DEFAULT_DATABASE, 
"non_exist"), false));
+    }
+
+    private static class TestRefreshHandler implements RefreshHandler {
+
+        private final String handlerString;
+
+        public TestRefreshHandler(String handlerString) {
+            this.handlerString = handlerString;
+        }
+
+        @Override
+        public String asSummaryString() {
+            return "test refresh handler";
+        }
+
+        public byte[] toBytes() {
+            return handlerString.getBytes();
+        }
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTestBase.java
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTestBase.java
new file mode 100644
index 00000000000..8cd5fc306e6
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTestBase.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.file.testutils.catalog;
+
+import org.apache.flink.test.util.AbstractTestBase;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+
+/** Base class for test filesystem catalog. */
+public abstract class TestFileSystemCatalogTestBase extends AbstractTestBase {
+
+    protected static final String TEST_CATALOG = "test_catalog";
+    protected static final String TEST_DEFAULT_DATABASE = "test_db";
+    protected static final String NONE_EXIST_DATABASE = "none_exist_database";
+
+    protected TestFileSystemCatalog catalog;
+
+    @TempDir File tempFile;
+
+    @BeforeEach
+    void before() {
+        File testDb = new File(tempFile, TEST_DEFAULT_DATABASE);
+        testDb.mkdir();
+
+        String catalogPathStr = tempFile.getAbsolutePath();
+        catalog = new TestFileSystemCatalog(catalogPathStr, TEST_CATALOG, 
TEST_DEFAULT_DATABASE);
+        catalog.open();
+    }
+
+    @AfterEach
+    void close() {
+        if (catalog != null) {
+            catalog.close();
+        }
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/resources/log4j2-test.properties
 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/resources/log4j2-test.properties
new file mode 100644
index 00000000000..8bb9fe60fd0
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-test-utils-parent/pom.xml b/flink-test-utils-parent/pom.xml
index 0c6a7dc08ce..96273501e20 100644
--- a/flink-test-utils-parent/pom.xml
+++ b/flink-test-utils-parent/pom.xml
@@ -40,6 +40,7 @@ under the License.
                <module>flink-connector-test-utils</module>
                <module>flink-clients-test-utils</module>
                <module>flink-migration-test-utils</module>
+               <module>flink-table-filesystem-test-utils</module>
        </modules>
 
 </project>


Reply via email to