This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 714d1cb2e0bd0df03393492dc87cbd800af63e1b Author: fengli <ldliu...@163.com> AuthorDate: Wed Apr 24 18:18:47 2024 +0800 [FLINK-35189][test-utils] Introduce test-filesystem connector and catalog based on filesystem to support materialized table --- .../flink-table-filesystem-test-utils/pom.xml | 103 +++ .../flink/connector/file/src/TestFileSource.java | 189 ++++++ .../file/table/TestFileSystemTableSource.java | 88 +++ .../file/testutils/TestFileSystemTableFactory.java | 54 ++ .../file/testutils/catalog/JsonSerdeUtil.java | 61 ++ .../testutils/catalog/TestFileSystemCatalog.java | 690 +++++++++++++++++++++ .../catalog/TestFileSystemCatalogFactory.java | 78 +++ .../org.apache.flink.table.factories.Factory | 17 + .../testutils/TestFileSystemTableFactoryTest.java | 64 ++ .../catalog/TestFileSystemCatalogFactoryTest.java | 92 +++ .../catalog/TestFileSystemCatalogITCase.java | 89 +++ .../catalog/TestFileSystemCatalogTest.java | 379 +++++++++++ .../catalog/TestFileSystemCatalogTestBase.java | 56 ++ .../src/test/resources/log4j2-test.properties | 28 + flink-test-utils-parent/pom.xml | 1 + 15 files changed, 1989 insertions(+) diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/pom.xml b/flink-test-utils-parent/flink-table-filesystem-test-utils/pom.xml new file mode 100644 index 00000000000..e1c8fdce665 --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/pom.xml @@ -0,0 +1,103 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils-parent</artifactId> + <version>1.20-SNAPSHOT</version> + </parent> + + <artifactId>flink-table-filesystem-test-utils</artifactId> + <name>Flink : Test utils : Table Filesystem</name> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java-bridge</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-files</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <!-- test dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-common</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-csv</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + +</project> \ No newline at end of file diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java new file mode 100644 index 00000000000..5d0e9ace77f --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/src/TestFileSource.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.src; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.DynamicParallelismInference; +import org.apache.flink.connector.file.src.assigners.FileSplitAssigner; +import org.apache.flink.connector.file.src.assigners.LocalityAwareSplitAssigner; +import org.apache.flink.connector.file.src.enumerate.BlockSplittingRecursiveEnumerator; +import org.apache.flink.connector.file.src.enumerate.FileEnumerator; +import org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveEnumerator; +import org.apache.flink.connector.file.src.reader.BulkFormat; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.Collection; + +/** + * A unified data source that reads files - both in batch and in streaming mode. This is used only + * for test. Due to {@link FileSource} is a final class, so we can't extend it directly. + * + * @param <T> The type of the events/records produced by this source. + */ +@Internal +public class TestFileSource<T> extends AbstractFileSource<T, FileSourceSplit> + implements DynamicParallelismInference { + + private static final long serialVersionUID = 1L; + + /** The default split assigner, a lazy locality-aware assigner. */ + public static final FileSplitAssigner.Provider DEFAULT_SPLIT_ASSIGNER = + LocalityAwareSplitAssigner::new; + + /** + * The default file enumerator used for splittable formats. The enumerator recursively + * enumerates files, split files that consist of multiple distributed storage blocks into + * multiple splits, and filters hidden files (files starting with '.' or '_'). Files with + * suffixes of common compression formats (for example '.gzip', '.bz2', '.xy', '.zip', ...) will + * not be split. + */ + public static final FileEnumerator.Provider DEFAULT_SPLITTABLE_FILE_ENUMERATOR = + BlockSplittingRecursiveEnumerator::new; + + /** + * The default file enumerator used for non-splittable formats. The enumerator recursively + * enumerates files, creates one split for the file, and filters hidden files (files starting + * with '.' or '_'). + */ + public static final FileEnumerator.Provider DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR = + NonSplittingRecursiveEnumerator::new; + + private final boolean isStreamingMode; + private final ContinuousEnumerationSettings continuousEnumerationSettings; + + // ------------------------------------------------------------------------ + + private TestFileSource( + final Path[] inputPaths, + final FileEnumerator.Provider fileEnumerator, + final FileSplitAssigner.Provider splitAssigner, + final BulkFormat<T, FileSourceSplit> readerFormat, + final boolean isStreamingMode, + @Nullable final ContinuousEnumerationSettings continuousEnumerationSettings) { + + super( + inputPaths, + fileEnumerator, + splitAssigner, + readerFormat, + continuousEnumerationSettings); + this.isStreamingMode = isStreamingMode; + this.continuousEnumerationSettings = continuousEnumerationSettings; + } + + @Override + public SimpleVersionedSerializer<FileSourceSplit> getSplitSerializer() { + return FileSourceSplitSerializer.INSTANCE; + } + + @Override + public Boundedness getBoundedness() { + return isStreamingMode && continuousEnumerationSettings != null + ? Boundedness.CONTINUOUS_UNBOUNDED + : Boundedness.BOUNDED; + } + + @Override + public int inferParallelism(Context dynamicParallelismContext) { + FileEnumerator fileEnumerator = getEnumeratorFactory().create(); + + Collection<FileSourceSplit> splits; + try { + splits = + fileEnumerator.enumerateSplits( + inputPaths, + dynamicParallelismContext.getParallelismInferenceUpperBound()); + } catch (IOException e) { + throw new FlinkRuntimeException("Could not enumerate file splits", e); + } + + return Math.min( + splits.size(), dynamicParallelismContext.getParallelismInferenceUpperBound()); + } + + // ------------------------------------------------------------------------ + // Entry-point Factory Methods + // ------------------------------------------------------------------------ + /** + * Builds a new {@code FileSource} using a {@link BulkFormat} to read batches of records from + * files. + * + * <p>Examples for bulk readers are compressed and vectorized formats such as ORC or Parquet. + */ + public static <T> TestFileSource.TestFileSourceBuilder<T> forBulkFileFormat( + final BulkFormat<T, FileSourceSplit> bulkFormat, final Path... paths) { + Preconditions.checkNotNull(bulkFormat, "reader"); + Preconditions.checkNotNull(paths, "paths"); + Preconditions.checkArgument(paths.length > 0, "paths must not be empty"); + + return new TestFileSource.TestFileSourceBuilder<>(paths, bulkFormat); + } + + // ------------------------------------------------------------------------ + // Builder + // ------------------------------------------------------------------------ + + /** + * The builder for the {@code FileSource}, to configure the various behaviors. + * + * <p>Start building the source via one of the following methods: + * + * <ul> + * <li>{@link TestFileSource#forBulkFileFormat(BulkFormat, Path...)} + * </ul> + */ + public static final class TestFileSourceBuilder<T> + extends AbstractFileSourceBuilder<T, FileSourceSplit, FileSource.FileSourceBuilder<T>> { + + private boolean isStreamingMode = false; + + TestFileSourceBuilder(Path[] inputPaths, BulkFormat<T, FileSourceSplit> readerFormat) { + super( + inputPaths, + readerFormat, + readerFormat.isSplittable() + ? DEFAULT_SPLITTABLE_FILE_ENUMERATOR + : DEFAULT_NON_SPLITTABLE_FILE_ENUMERATOR, + DEFAULT_SPLIT_ASSIGNER); + } + + public TestFileSourceBuilder<T> setStreamingMode(boolean streamingMode) { + this.isStreamingMode = streamingMode; + return this; + } + + @Override + public TestFileSource<T> build() { + return new TestFileSource<>( + inputPaths, + fileEnumerator, + splitAssigner, + readerFormat, + isStreamingMode, + continuousSourceSettings); + } + } +} diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java new file mode 100644 index 00000000000..3158f03473c --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/connector/file/table/TestFileSystemTableSource.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.file.table; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.connector.file.src.FileSourceSplit; +import org.apache.flink.connector.file.src.TestFileSource; +import org.apache.flink.connector.file.src.enumerate.BlockSplittingRecursiveAllDirEnumerator; +import org.apache.flink.connector.file.src.enumerate.NonSplittingRecursiveAllDirEnumerator; +import org.apache.flink.connector.file.src.reader.BulkFormat; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.source.SourceProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.DataType; + +import javax.annotation.Nullable; + +import java.util.List; + +/** Test file system table source. */ +@Internal +public class TestFileSystemTableSource extends FileSystemTableSource { + + public TestFileSystemTableSource( + ObjectIdentifier tableIdentifier, + DataType physicalRowDataType, + List<String> partitionKeys, + ReadableConfig tableOptions, + @Nullable DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat, + @Nullable DecodingFormat<DeserializationSchema<RowData>> deserializationFormat) { + super( + tableIdentifier, + physicalRowDataType, + partitionKeys, + tableOptions, + bulkReaderFormat, + deserializationFormat); + } + + @Override + protected SourceProvider createSourceProvider(BulkFormat<RowData, FileSourceSplit> bulkFormat) { + final TestFileSource.TestFileSourceBuilder<RowData> fileSourceBuilder = + TestFileSource.forBulkFileFormat(bulkFormat, paths()); + + tableOptions + .getOptional(FileSystemConnectorOptions.SOURCE_MONITOR_INTERVAL) + .ifPresent(fileSourceBuilder::monitorContinuously); + tableOptions + .getOptional(FileSystemConnectorOptions.SOURCE_PATH_REGEX_PATTERN) + .ifPresent( + regex -> + fileSourceBuilder.setFileEnumerator( + bulkFormat.isSplittable() + ? () -> + new BlockSplittingRecursiveAllDirEnumerator( + regex) + : () -> + new NonSplittingRecursiveAllDirEnumerator( + regex))); + + boolean isStreamingMode = + tableOptions.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; + fileSourceBuilder.setStreamingMode(isStreamingMode); + + return SourceProvider.of(fileSourceBuilder.build()); + } +} diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java new file mode 100644 index 00000000000..aa5cd5e17bb --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactory.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.file.testutils; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.file.table.FileSystemTableFactory; +import org.apache.flink.connector.file.table.TestFileSystemTableSource; +import org.apache.flink.connector.file.table.factories.BulkReaderFormatFactory; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.factories.FactoryUtil; + +/** Test filesystem {@link Factory}. */ +@Internal +public class TestFileSystemTableFactory extends FileSystemTableFactory { + + public static final String IDENTIFIER = "test-filesystem"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); + validate(helper); + + return new TestFileSystemTableSource( + context.getObjectIdentifier(), + context.getPhysicalRowDataType(), + context.getCatalogTable().getPartitionKeys(), + helper.getOptions(), + discoverDecodingFormat(context, BulkReaderFormatFactory.class), + discoverDecodingFormat(context, DeserializationFormatFactory.class)); + } +} diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/JsonSerdeUtil.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/JsonSerdeUtil.java new file mode 100644 index 00000000000..755ecfd897b --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/JsonSerdeUtil.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.file.testutils.catalog; + +import org.apache.flink.util.jackson.JacksonMapperFactory; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.json.JsonReadFeature; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; +import java.io.UncheckedIOException; + +public class JsonSerdeUtil { + + /** + * Object mapper shared instance to serialize and deserialize the plan. Note that creating and + * copying of object mappers is expensive and should be avoided. + */ + private static final ObjectMapper OBJECT_MAPPER_INSTANCE; + + static { + OBJECT_MAPPER_INSTANCE = + JacksonMapperFactory.createObjectMapper() + .configure( + JsonReadFeature.ALLOW_UNESCAPED_CONTROL_CHARS.mappedFeature(), + true); + } + + public static <T> T fromJson(String json, Class<T> clazz) { + try { + return OBJECT_MAPPER_INSTANCE.reader().readValue(json, clazz); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public static <T> String toJson(T t) { + try { + return OBJECT_MAPPER_INSTANCE.writerWithDefaultPrettyPrinter().writeValueAsString(t); + } catch (JsonProcessingException e) { + throw new UncheckedIOException(e); + } + } +} diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java new file mode 100644 index 00000000000..490dd29d608 --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalog.java @@ -0,0 +1,690 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.file.testutils.catalog; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.connector.file.table.FileSystemConnectorOptions; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.CatalogPropertiesUtil; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogView; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogBaseTable; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.FunctionAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.FunctionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionSpecInvalidException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; +import org.apache.flink.table.catalog.exceptions.TablePartitionedException; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.file.testutils.TestFileSystemTableFactory; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.StringUtils; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** A catalog implementation for test {@link FileSystem}. */ +public class TestFileSystemCatalog extends AbstractCatalog { + + public static final String SCHEMA_PATH = "schema"; + public static final String DATA_PATH = "data"; + private static final String SCHEMA_FILE_EXTENSION = ".json"; + + private final String catalogPathStr; + + private Path catalogPath; + private FileSystem fs; + + public TestFileSystemCatalog(String pathStr, String name, String defaultDatabase) { + super(name, defaultDatabase); + this.catalogPathStr = pathStr; + } + + @VisibleForTesting + public String getCatalogPathStr() { + return catalogPathStr; + } + + @Override + public Optional<Factory> getFactory() { + return Optional.of(new TestFileSystemTableFactory()); + } + + @Override + public void open() throws CatalogException { + try { + catalogPath = new Path(catalogPathStr); + fs = catalogPath.getFileSystem(); + if (!fs.exists(catalogPath)) { + throw new CatalogException( + String.format( + "Catalog %s path %s does not exist.", getName(), catalogPath)); + } + if (!fs.getFileStatus(catalogPath).isDir()) { + throw new CatalogException( + String.format( + "Failed to open catalog path. The given path %s is not a directory.", + catalogPath)); + } + } catch (IOException e) { + throw new CatalogException( + String.format("Checking catalog path %s exists occur exception.", catalogPath), + e); + } + } + + @Override + public void close() throws CatalogException {} + + @Override + public List<String> listDatabases() throws CatalogException { + try { + FileStatus[] fileStatuses = fs.listStatus(catalogPath); + return Arrays.stream(fileStatuses) + .filter(FileStatus::isDir) + .map(fileStatus -> fileStatus.getPath().getName()) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new CatalogException("Listing database occur exception.", e); + } + } + + @Override + public CatalogDatabase getDatabase(String databaseName) + throws DatabaseNotExistException, CatalogException { + if (databaseExists(databaseName)) { + return new CatalogDatabaseImpl(Collections.emptyMap(), null); + } else { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(databaseName), + "The database name cannot be null or empty."); + + return listDatabases().contains(databaseName); + } + + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + if (databaseExists(name)) { + if (ignoreIfExists) { + return; + } else { + throw new DatabaseAlreadyExistException(getName(), name); + } + } + + if (!CollectionUtil.isNullOrEmpty(database.getProperties())) { + throw new CatalogException( + "TestFilesystem catalog doesn't support to create database with options."); + } + + Path dbPath = new Path(catalogPath, name); + try { + fs.mkdirs(dbPath); + } catch (IOException e) { + throw new CatalogException( + String.format("Creating database %s occur exception.", name), e); + } + } + + @Override + public void dropDatabase(String databaseName, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException, CatalogException { + if (!databaseExists(databaseName)) { + if (ignoreIfNotExists) { + return; + } else { + throw new DatabaseNotExistException(getName(), databaseName); + } + } + + List<String> tables = listTables(databaseName); + if (!tables.isEmpty() && !cascade) { + throw new DatabaseNotEmptyException(getName(), databaseName); + } + + if (databaseName.equals(getDefaultDatabase())) { + throw new IllegalArgumentException( + "TestFilesystem catalog doesn't support to drop the default database."); + } + + Path dbPath = new Path(catalogPath, databaseName); + try { + fs.delete(dbPath, true); + } catch (IOException e) { + throw new CatalogException( + String.format("Dropping database %s occur exception.", databaseName), e); + } + } + + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException("alterDatabase is not implemented."); + } + + @Override + public List<String> listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + Path dbPath = new Path(catalogPath, databaseName); + try { + return Arrays.stream(fs.listStatus(dbPath)) + .filter(FileStatus::isDir) + .map(fileStatus -> fileStatus.getPath().getName()) + .collect(Collectors.toList()); + } catch (IOException e) { + throw new CatalogException( + String.format("Listing table in database %s occur exception.", dbPath), e); + } + } + + @Override + public List<String> listViews(String databaseName) + throws DatabaseNotExistException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogBaseTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + final Path tableSchemaPath = + tableSchemaFilePath( + inferTableSchemaPath(catalogPathStr, tablePath), tablePath.getObjectName()); + final Path tableDataPath = inferTableDataPath(catalogPathStr, tablePath); + try { + FileSystemTableInfo tableInfo = + JsonSerdeUtil.fromJson( + readFileUtf8(tableSchemaPath), FileSystemTableInfo.class); + return deserializeTable( + tableInfo.getTableKind(), + tableInfo.getCatalogTableInfo(), + tableDataPath.getPath()); + } catch (IOException e) { + throw new CatalogException( + String.format("Getting table %s occur exception.", tablePath), e); + } + } + + @Override + public boolean tableExists(ObjectPath tablePath) throws CatalogException { + Path path = inferTablePath(catalogPathStr, tablePath); + Path tableSchemaFilePath = + tableSchemaFilePath( + inferTableSchemaPath(catalogPathStr, tablePath), tablePath.getObjectName()); + try { + return fs.exists(path) && fs.exists(tableSchemaFilePath); + } catch (IOException e) { + throw new CatalogException( + String.format("Checking table %s exists occur exception.", tablePath), e); + } + } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + if (!tableExists(tablePath)) { + if (ignoreIfNotExists) { + return; + } else { + throw new TableNotExistException(getName(), tablePath); + } + } + + Path path = inferTablePath(catalogPathStr, tablePath); + try { + fs.delete(path, true); + } catch (IOException e) { + throw new CatalogException( + String.format("Dropping table %s occur exception.", tablePath), e); + } + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws TableNotExistException, TableAlreadyExistException, CatalogException { + throw new UnsupportedOperationException("renameTable is not implemented."); + } + + @Override + public void createTable( + ObjectPath tablePath, CatalogBaseTable catalogTable, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + if (!databaseExists(tablePath.getDatabaseName())) { + throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName()); + } + if (tableExists(tablePath)) { + if (ignoreIfExists) { + return; + } else { + throw new TableAlreadyExistException(getName(), tablePath); + } + } + + if (catalogTable instanceof CatalogView) { + throw new UnsupportedOperationException( + "TestFilesystem catalog doesn't support to CREATE VIEW."); + } + + Tuple4<Path, Path, Path, String> tableSchemaTuple = + getTableJsonInfo(tablePath, catalogTable); + Path path = tableSchemaTuple.f0; + Path tableSchemaPath = tableSchemaTuple.f1; + Path tableDataPath = tableSchemaTuple.f2; + String jsonSchema = tableSchemaTuple.f3; + try { + if (!fs.exists(path)) { + fs.mkdirs(path); + fs.mkdirs(tableSchemaPath); + fs.mkdirs(tableDataPath); + } + + // write table schema + Path tableSchemaFilePath = + tableSchemaFilePath(tableSchemaPath, tablePath.getObjectName()); + try (FSDataOutputStream os = + fs.create(tableSchemaFilePath, FileSystem.WriteMode.NO_OVERWRITE)) { + os.write(jsonSchema.getBytes(StandardCharsets.UTF_8)); + } + + } catch (IOException e) { + throw new CatalogException( + String.format("Create table %s occur exception.", tablePath), e); + } + } + + @Override + public void alterTable( + ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException("alterTable is not implemented"); + } + + @Override + public void alterTable( + ObjectPath tablePath, + CatalogBaseTable newTable, + List<TableChange> tableChanges, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + if (ignoreIfNotExists && !tableExists(tablePath)) { + return; + } + + Tuple4<Path, Path, Path, String> tableSchemaInfo = getTableJsonInfo(tablePath, newTable); + Path tableSchemaPath = tableSchemaInfo.f1; + String jsonSchema = tableSchemaInfo.f3; + try { + if (!fs.exists(tableSchemaPath)) { + throw new CatalogException( + String.format( + "Table %s schema file %s doesn't exists.", + tablePath, tableSchemaPath)); + } + // write new table schema + Path tableSchemaFilePath = + tableSchemaFilePath(tableSchemaPath, tablePath.getObjectName()); + try (FSDataOutputStream os = + fs.create(tableSchemaFilePath, FileSystem.WriteMode.OVERWRITE)) { + os.write(jsonSchema.getBytes(StandardCharsets.UTF_8)); + } + + } catch (IOException e) { + throw new CatalogException( + String.format("Altering table %s occur exception.", tablePath), e); + } + } + + @Override + public List<CatalogPartitionSpec> listPartitions(ObjectPath tablePath) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + return Collections.emptyList(); + } + + @Override + public List<CatalogPartitionSpec> listPartitions( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, CatalogException { + return Collections.emptyList(); + } + + @Override + public List<CatalogPartitionSpec> listPartitionsByFilter( + ObjectPath tablePath, List<Expression> filters) + throws TableNotExistException, TableNotPartitionedException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + throw new PartitionNotExistException(getName(), tablePath, partitionSpec); + } + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + return false; + } + + @Override + public void createPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition partition, + boolean ignoreIfExists) + throws TableNotExistException, TableNotPartitionedException, + PartitionSpecInvalidException, PartitionAlreadyExistsException, + CatalogException { + throw new UnsupportedOperationException("createPartition is not implemented."); + } + + @Override + public void dropPartition( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException("dropPartition is not implemented."); + } + + @Override + public void alterPartition( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException("alterPartition is not implemented."); + } + + @Override + public List<String> listFunctions(String dbName) + throws DatabaseNotExistException, CatalogException { + return Collections.emptyList(); + } + + @Override + public CatalogFunction getFunction(ObjectPath functionPath) + throws FunctionNotExistException, CatalogException { + throw new FunctionNotExistException(getName(), functionPath); + } + + @Override + public boolean functionExists(ObjectPath functionPath) throws CatalogException { + return false; + } + + @Override + public void createFunction( + ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) + throws FunctionAlreadyExistException, DatabaseNotExistException, CatalogException { + throw new UnsupportedOperationException("createFunction is not implemented."); + } + + @Override + public void alterFunction( + ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException("alterFunction is not implemented."); + } + + @Override + public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) + throws FunctionNotExistException, CatalogException { + throw new UnsupportedOperationException("dropFunction is not implemented."); + } + + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } + + @Override + public CatalogTableStatistics getPartitionStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return CatalogTableStatistics.UNKNOWN; + } + + @Override + public CatalogColumnStatistics getPartitionColumnStatistics( + ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + return CatalogColumnStatistics.UNKNOWN; + } + + @Override + public void alterTableStatistics( + ObjectPath tablePath, CatalogTableStatistics tableStatistics, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + throw new UnsupportedOperationException("alterTableStatistics is not implemented."); + } + + @Override + public void alterTableColumnStatistics( + ObjectPath tablePath, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException, TablePartitionedException { + throw new UnsupportedOperationException("alterTableColumnStatistics is not implemented."); + } + + @Override + public void alterPartitionStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogTableStatistics partitionStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException("alterPartitionStatistics is not implemented."); + } + + @Override + public void alterPartitionColumnStatistics( + ObjectPath tablePath, + CatalogPartitionSpec partitionSpec, + CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new UnsupportedOperationException( + "alterPartitionColumnStatistics is not implemented."); + } + + private Tuple4<Path, Path, Path, String> getTableJsonInfo( + ObjectPath tablePath, CatalogBaseTable catalogTable) { + final Path path = inferTablePath(catalogPathStr, tablePath); + final Path tableSchemaPath = inferTableSchemaPath(catalogPathStr, tablePath); + final Path tableDataPathStr = inferTableDataPath(catalogPathStr, tablePath); + + ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable = + (ResolvedCatalogBaseTable<?>) catalogTable; + Map<String, String> catalogTableInfo = serializeTable(resolvedCatalogBaseTable); + CatalogBaseTable.TableKind tableKind = catalogTable.getTableKind(); + FileSystemTableInfo tableInfo = new FileSystemTableInfo(tableKind, catalogTableInfo); + + String jsonSchema = JsonSerdeUtil.toJson(tableInfo); + return Tuple4.of(path, tableSchemaPath, tableDataPathStr, jsonSchema); + } + + /** Read file to UTF_8 decoding. */ + private String readFileUtf8(Path path) throws IOException { + try (FSDataInputStream in = path.getFileSystem().open(path)) { + BufferedReader reader = + new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); + StringBuilder builder = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + builder.append(line); + } + return builder.toString(); + } + } + + private Path inferTablePath(String catalogPath, ObjectPath tablePath) { + return new Path( + String.format( + "%s/%s/%s", + catalogPath, tablePath.getDatabaseName(), tablePath.getObjectName())); + } + + private Path inferTableDataPath(String catalogPath, ObjectPath tablePath) { + return new Path( + String.format( + "%s/%s/%s/%s", + catalogPath, + tablePath.getDatabaseName(), + tablePath.getObjectName(), + DATA_PATH)); + } + + private Path inferTableSchemaPath(String catalogPath, ObjectPath tablePath) { + return new Path( + String.format( + "%s/%s/%s/%s", + catalogPath, + tablePath.getDatabaseName(), + tablePath.getObjectName(), + SCHEMA_PATH)); + } + + private Path tableSchemaFilePath(Path tableSchemaPath, String tableName) { + return new Path(tableSchemaPath, tableName + "_schema" + SCHEMA_FILE_EXTENSION); + } + + private Map<String, String> serializeTable( + ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable) { + if (resolvedCatalogBaseTable instanceof ResolvedCatalogTable) { + return CatalogPropertiesUtil.serializeCatalogTable( + (ResolvedCatalogTable) resolvedCatalogBaseTable); + } else if (resolvedCatalogBaseTable instanceof ResolvedCatalogMaterializedTable) { + return CatalogPropertiesUtil.serializeCatalogMaterializedTable( + (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable); + } + + throw new IllegalArgumentException( + "Unknown kind of resolved catalog base table: " + + resolvedCatalogBaseTable.getClass()); + } + + private CatalogBaseTable deserializeTable( + CatalogBaseTable.TableKind tableKind, + Map<String, String> properties, + String tableDataPath) { + if (CatalogBaseTable.TableKind.TABLE == tableKind) { + CatalogTable catalogTable = CatalogPropertiesUtil.deserializeCatalogTable(properties); + // put table data path + Map<String, String> options = new HashMap<>(catalogTable.getOptions()); + options.put(FileSystemConnectorOptions.PATH.key(), tableDataPath); + return catalogTable.copy(options); + } else if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE == tableKind) { + CatalogMaterializedTable catalogMaterializedTable = + CatalogPropertiesUtil.deserializeCatalogMaterializedTable(properties); + // put table data path + Map<String, String> options = new HashMap<>(catalogMaterializedTable.getOptions()); + options.put(FileSystemConnectorOptions.PATH.key(), tableDataPath); + return catalogMaterializedTable.copy(options); + } + + throw new IllegalArgumentException("Unknown catalog base table kind: " + tableKind); + } + + /** The pojo class represents serializable catalog base table info. */ + public static class FileSystemTableInfo { + + private final CatalogBaseTable.TableKind tableKind; + private final Map<String, String> catalogTableInfo; + + @JsonCreator + public FileSystemTableInfo( + @JsonProperty("tableKind") CatalogBaseTable.TableKind tableKind, + @JsonProperty("catalogTableInfo") Map<String, String> catalogTableInfo) { + this.tableKind = tableKind; + this.catalogTableInfo = catalogTableInfo; + } + + public CatalogBaseTable.TableKind getTableKind() { + return tableKind; + } + + public Map<String, String> getCatalogTableInfo() { + return catalogTableInfo; + } + } +} diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java new file mode 100644 index 00000000000..41999468d52 --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactory.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.file.testutils.catalog; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.table.catalog.Catalog; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; + +import static org.apache.flink.table.file.testutils.TestFileSystemTableFactory.IDENTIFIER; + +/** Factory for {@link TestFileSystemCatalog}. */ +public class TestFileSystemCatalogFactory implements CatalogFactory { + + public static final ConfigOption<String> PATH = + ConfigOptions.key("path") + .stringType() + .noDefaultValue() + .withDescription( + "Catalog base DFS path, used for inferring the sink table path. " + + "The default strategy for a table path is: ${catalog.path}/${db_name}/${table_name}"); + + public static final ConfigOption<String> DEFAULT_DATABASE = + ConfigOptions.key(CommonCatalogOptions.DEFAULT_DATABASE_KEY) + .stringType() + .defaultValue("default"); + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtil.createCatalogFactoryHelper(this, context); + helper.validate(); + return new TestFileSystemCatalog( + helper.getOptions().get(PATH), + context.getName(), + helper.getOptions().get(DEFAULT_DATABASE)); + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + Set<ConfigOption<?>> set = new HashSet<>(); + set.add(PATH); + set.add(DEFAULT_DATABASE); + return set; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return Collections.emptySet(); + } +} diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 00000000000..7977248c5be --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalogFactory +org.apache.flink.table.file.testutils.TestFileSystemTableFactory diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java new file mode 100644 index 00000000000..b02a7f124ad --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/TestFileSystemTableFactoryTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.file.testutils; + +import org.apache.flink.connector.file.table.FileSystemTableSink; +import org.apache.flink.connector.file.table.TestFileSystemTableSource; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.FactoryUtil; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink; +import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TestFileSystemTableFactory}. */ +public class TestFileSystemTableFactoryTest { + + private static final ResolvedSchema SCHEMA = + ResolvedSchema.of( + Column.physical("f0", DataTypes.STRING()), + Column.physical("f1", DataTypes.BIGINT()), + Column.physical("f2", DataTypes.BIGINT())); + + @Test + void testCreateSourceSink() { + Map<String, String> options = new HashMap<>(); + options.put(FactoryUtil.CONNECTOR.key(), "test-filesystem"); + options.put("path", "/tmp"); + options.put("format", "testcsv"); + + // test ignore format options + options.put("testcsv.my_option", "my_value"); + + DynamicTableSource source = createTableSource(SCHEMA, options); + assertThat(source).isInstanceOf(TestFileSystemTableSource.class); + + DynamicTableSink sink = createTableSink(SCHEMA, options); + assertThat(sink).isInstanceOf(FileSystemTableSink.class); + } +} diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactoryTest.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactoryTest.java new file mode 100644 index 00000000000..4b77a979f93 --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogFactoryTest.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.file.testutils.catalog; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.types.Row; +import org.apache.flink.util.CollectionUtil; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link TestFileSystemCatalog} created by {@link TestFileSystemCatalogFactory}. */ +public class TestFileSystemCatalogFactoryTest { + + private static final String TEST_CATALOG = "test_catalog"; + private static final String TEST_DEFAULT_DATABASE = "test_db"; + + @TempDir File tempFile; + + @BeforeEach + void before() { + File testDb = new File(tempFile, TEST_DEFAULT_DATABASE); + testDb.mkdir(); + } + + @Test + public void testCreateCatalogWithDDL() { + final TableEnvironment tEnv = + TableEnvironment.create(EnvironmentSettings.inStreamingMode()); + String catalogDDL = + String.format( + "CREATE CATALOG %s\n" + + "WITH (\n" + + " 'type' = 'test-filesystem',\n" + + " 'path' = '%s',\n" + + " 'default-database' = '%s'\n" + + " )", + TEST_CATALOG, tempFile.getAbsolutePath(), TEST_DEFAULT_DATABASE); + + tEnv.executeSql(catalogDDL); + + List<Row> result = + CollectionUtil.iteratorToList(tEnv.executeSql("SHOW CATALOGS").collect()); + assertThat(result.contains(Row.of(TEST_CATALOG))).isTrue(); + } + + @Test + public void testCreateCatalogWithFactory() { + final Map<String, String> properties = new HashMap<>(); + properties.put("type", "test-filesystem"); + properties.put("default-database", TEST_DEFAULT_DATABASE); + properties.put("path", tempFile.getAbsolutePath()); + final TestFileSystemCatalog actualCatalog = + (TestFileSystemCatalog) + FactoryUtil.createCatalog( + TEST_CATALOG, + properties, + new Configuration(), + TestFileSystemCatalogFactoryTest.class.getClassLoader()); + + assertThat(actualCatalog.getCatalogPathStr()).isEqualTo(tempFile.getAbsolutePath()); + assertThat(actualCatalog.getName()).isEqualTo(TEST_CATALOG); + assertThat(actualCatalog.getDefaultDatabase()).isEqualTo(TEST_DEFAULT_DATABASE); + } +} diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java new file mode 100644 index 00000000000..e418f6c80a1 --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogITCase.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.file.testutils.catalog; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Integration tests for {@link TestFileSystemCatalog}. */ +public class TestFileSystemCatalogITCase extends TestFileSystemCatalogTestBase { + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testReadAndWriteTestFileSystemTable(boolean isStreamingMode) throws Exception { + TableEnvironment tEnv = + TableEnvironment.create( + isStreamingMode + ? EnvironmentSettings.inStreamingMode() + : EnvironmentSettings.inBatchMode()); + + tEnv.registerCatalog(TEST_CATALOG, catalog); + tEnv.useCatalog(TEST_CATALOG); + + tEnv.executeSql( + "CREATE TABLE CsvTable (\n" + + " id BIGINT,\n" + + " user_name STRING,\n" + + " message STRING,\n" + + " log_ts STRING\n" + + ") WITH (\n" + + " 'format' = 'csv'\n" + + ")"); + + tEnv.getConfig().getConfiguration().setString("parallelism.default", "1"); + tEnv.executeSql( + String.format( + "INSERT INTO %s.%s.CsvTable VALUES\n" + + "(1001, 'user1', 'hello world', '2021-06-10 10:00:00'),\n" + + "(1002, 'user2', 'hi', '2021-06-10 10:01:00'),\n" + + "(1003, 'user3', 'ciao', '2021-06-10 10:02:00'),\n" + + "(1004, 'user4', '你好', '2021-06-10 10:03:00')", + TEST_CATALOG, TEST_DEFAULT_DATABASE)) + .await(); + + CloseableIterator<Row> result = + tEnv.executeSql( + String.format( + "SELECT * FROM %s.%s.CsvTable", + TEST_CATALOG, TEST_DEFAULT_DATABASE)) + .collect(); + + // assert query result + List<Row> expected = + Arrays.asList( + Row.of(1001L, "user1", "hello world", "2021-06-10 10:00:00"), + Row.of(1002L, "user2", "hi", "2021-06-10 10:01:00"), + Row.of(1003L, "user3", "ciao", "2021-06-10 10:02:00"), + Row.of(1004L, "user4", "你好", "2021-06-10 10:03:00")); + + assertThat(expected).isEqualTo((Lists.newArrayList(result))); + } +} diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java new file mode 100644 index 00000000000..07b11583d4f --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTest.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.file.testutils.catalog; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; +import org.apache.flink.table.catalog.CatalogMaterializedTable; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.TestSchemaResolver; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.refresh.RefreshHandler; + +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.flink.connector.file.table.FileSystemConnectorOptions.PATH; +import static org.apache.flink.table.file.testutils.catalog.TestFileSystemCatalog.DATA_PATH; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** Test for {@link TestFileSystemCatalog}. */ +public class TestFileSystemCatalogTest extends TestFileSystemCatalogTestBase { + + private static final List<Column> CREATE_COLUMNS = + Arrays.asList( + Column.physical("id", DataTypes.BIGINT()), + Column.physical("name", DataTypes.VARCHAR(20)), + Column.physical("age", DataTypes.INT()), + Column.physical("tss", DataTypes.TIMESTAMP(3)), + Column.physical("partition", DataTypes.VARCHAR(10))); + private static final UniqueConstraint CONSTRAINTS = + UniqueConstraint.primaryKey("primary_constraint", Collections.singletonList("id")); + private static final List<String> PARTITION_KEYS = Collections.singletonList("partition"); + + private static final ResolvedSchema CREATE_RESOLVED_SCHEMA = + new ResolvedSchema(CREATE_COLUMNS, Collections.emptyList(), CONSTRAINTS); + + private static final Schema CREATE_SCHEMA = + Schema.newBuilder().fromResolvedSchema(CREATE_RESOLVED_SCHEMA).build(); + + private static final Map<String, String> EXPECTED_OPTIONS = new HashMap<>(); + + static { + EXPECTED_OPTIONS.put("source.monitor-interval", "5S"); + EXPECTED_OPTIONS.put("auto-compaction", "true"); + } + + private static final ResolvedCatalogTable EXPECTED_CATALOG_TABLE = + new ResolvedCatalogTable( + CatalogTable.newBuilder() + .schema(CREATE_SCHEMA) + .comment("test table") + .partitionKeys(PARTITION_KEYS) + .options(EXPECTED_OPTIONS) + .build(), + CREATE_RESOLVED_SCHEMA); + + private static final String DEFINITION_QUERY = "SELECT id, region, county FROM T"; + private static final Duration FRESHNESS = Duration.ofMinutes(3); + private static final ResolvedCatalogMaterializedTable EXPECTED_CATALOG_MATERIALIZED_TABLE = + new ResolvedCatalogMaterializedTable( + CatalogMaterializedTable.newBuilder() + .schema(CREATE_SCHEMA) + .comment("test materialized table") + .partitionKeys(PARTITION_KEYS) + .options(EXPECTED_OPTIONS) + .definitionQuery(DEFINITION_QUERY) + .freshness(FRESHNESS) + .logicalRefreshMode( + CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC) + .refreshMode(CatalogMaterializedTable.RefreshMode.CONTINUOUS) + .refreshStatus(CatalogMaterializedTable.RefreshStatus.INITIALIZING) + .build(), + CREATE_RESOLVED_SCHEMA); + + private static final TestRefreshHandler REFRESH_HANDLER = + new TestRefreshHandler("jobID: xxx, clusterId: yyy"); + + @Test + public void testListDatabases() { + List<String> actual = catalog.listDatabases(); + assertThat(actual.contains(TEST_DEFAULT_DATABASE)).isTrue(); + assertThat(actual.contains(NONE_EXIST_DATABASE)).isFalse(); + } + + @Test + public void testDatabaseExists() { + assertThat(catalog.databaseExists(TEST_DEFAULT_DATABASE)).isTrue(); + assertThat(catalog.databaseExists(NONE_EXIST_DATABASE)).isFalse(); + } + + @Test + public void testCreateAndDropDatabase() throws Exception { + CatalogDatabase expected = new CatalogDatabaseImpl(Collections.emptyMap(), null); + catalog.createDatabase("db1", expected, true); + + CatalogDatabase actual = catalog.getDatabase("db1"); + assertThat(catalog.listDatabases().contains("db1")).isTrue(); + assertThat(actual.getProperties()).isEqualTo(expected.getProperties()); + + // create exist database + assertThrows( + DatabaseAlreadyExistException.class, + () -> catalog.createDatabase("db1", expected, false)); + + // drop exist database + catalog.dropDatabase("db1", true); + assertThat(catalog.listDatabases().contains("db1")).isFalse(); + + // drop non-exist database + assertThrows( + DatabaseNotExistException.class, + () -> catalog.dropDatabase(NONE_EXIST_DATABASE, false)); + } + + @Test + public void testCreateDatabaseWithOptions() { + Map<String, String> options = new HashMap<>(); + options.put("k1", "v1"); + options.put("k2", "v2"); + + assertThrows( + CatalogException.class, + () -> catalog.createDatabase("db1", new CatalogDatabaseImpl(options, null), true)); + } + + @Test + public void testCreateAndGetCatalogTable() throws Exception { + ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1"); + // test create table + catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true); + + // test table exist + assertThat(catalog.tableExists(tablePath)).isTrue(); + + Map<String, String> expectedOptions = new HashMap<>(EXPECTED_OPTIONS); + expectedOptions.put( + PATH.key(), + String.format( + "%s/%s/%s/%s", + tempFile.getAbsolutePath(), + tablePath.getDatabaseName(), + tablePath.getObjectName(), + DATA_PATH)); + + // test get table + CatalogBaseTable actualTable = catalog.getTable(tablePath); + // validate table type + assertThat(actualTable.getTableKind()).isEqualTo(CatalogBaseTable.TableKind.TABLE); + + CatalogTable actualCatalogTable = (CatalogTable) actualTable; + // validate schema + assertThat(actualCatalogTable.getUnresolvedSchema().resolve(new TestSchemaResolver())) + .isEqualTo(CREATE_RESOLVED_SCHEMA); + // validate partition key + assertThat(actualCatalogTable.getPartitionKeys()).isEqualTo(PARTITION_KEYS); + // validate options + assertThat(actualCatalogTable.getOptions()).isEqualTo(expectedOptions); + + // test create exist table + assertThrows( + TableAlreadyExistException.class, + () -> catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, false)); + } + + @Test + public void testCreateAndGetCatalogMaterializedTable() throws Exception { + ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb2"); + // test create materialized table + catalog.createTable(tablePath, EXPECTED_CATALOG_MATERIALIZED_TABLE, true); + + // test materialized table exist + assertThat(catalog.tableExists(tablePath)).isTrue(); + + Map<String, String> expectedOptions = new HashMap<>(EXPECTED_OPTIONS); + expectedOptions.put( + PATH.key(), + String.format( + "%s/%s/%s/%s", + tempFile.getAbsolutePath(), + tablePath.getDatabaseName(), + tablePath.getObjectName(), + DATA_PATH)); + + // test get materialized table + CatalogBaseTable actualTable = catalog.getTable(tablePath); + // validate table type + assertThat(actualTable.getTableKind()) + .isEqualTo(CatalogBaseTable.TableKind.MATERIALIZED_TABLE); + + CatalogMaterializedTable actualMaterializedTable = (CatalogMaterializedTable) actualTable; + // validate schema + assertThat(actualMaterializedTable.getUnresolvedSchema().resolve(new TestSchemaResolver())) + .isEqualTo(CREATE_RESOLVED_SCHEMA); + // validate partition key + assertThat(actualMaterializedTable.getPartitionKeys()).isEqualTo(PARTITION_KEYS); + // validate options + assertThat(actualMaterializedTable.getOptions()).isEqualTo(expectedOptions); + // validate definition query + assertThat(actualMaterializedTable.getDefinitionQuery()).isEqualTo(DEFINITION_QUERY); + // validate freshness + assertThat(actualMaterializedTable.getFreshness()).isEqualTo(FRESHNESS); + // validate logical refresh mode + assertThat(actualMaterializedTable.getLogicalRefreshMode()) + .isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC); + // validate refresh mode + assertThat(actualMaterializedTable.getRefreshMode()) + .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS); + // validate refresh status + assertThat(actualMaterializedTable.getRefreshStatus()) + .isEqualTo(CatalogMaterializedTable.RefreshStatus.INITIALIZING); + // validate refresh handler + assertThat(actualMaterializedTable.getRefreshHandlerDescription()) + .isEqualTo(Optional.empty()); + assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNull(); + + // test create exist materialized table + assertThrows( + TableAlreadyExistException.class, + () -> catalog.createTable(tablePath, EXPECTED_CATALOG_MATERIALIZED_TABLE, false)); + } + + @Test + public void testListTable() throws Exception { + ObjectPath tablePath1 = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1"); + ObjectPath tablePath2 = new ObjectPath(TEST_DEFAULT_DATABASE, "tb2"); + + // create table + catalog.createTable(tablePath1, EXPECTED_CATALOG_TABLE, true); + catalog.createTable(tablePath2, EXPECTED_CATALOG_MATERIALIZED_TABLE, true); + + // test list table + List<String> tables = catalog.listTables(TEST_DEFAULT_DATABASE); + assertThat(tables.contains(tablePath1.getObjectName())).isTrue(); + assertThat(tables.contains(tablePath2.getObjectName())).isTrue(); + + // test list non-exist database table + assertThrows( + DatabaseNotExistException.class, () -> catalog.listTables(NONE_EXIST_DATABASE)); + } + + @Test + public void testAlterCatalogTable() throws Exception { + ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1"); + // test create table + catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true); + + // test table exist + assertThat(catalog.tableExists(tablePath)).isTrue(); + + // alter table options + Map<String, String> options = new HashMap<>(); + options.put("auto-compaction", "true"); + options.put("sink.parallelism", "5"); + ResolvedCatalogTable updatedResolvedTable = EXPECTED_CATALOG_TABLE.copy(options); + catalog.alterTable(tablePath, updatedResolvedTable, Collections.emptyList(), false); + + // test get table + CatalogBaseTable actualTable = catalog.getTable(tablePath); + // validate table type + assertThat(actualTable.getTableKind()).isEqualTo(CatalogBaseTable.TableKind.TABLE); + + // validate options + Map<String, String> expectedOptions = new HashMap<>(options); + expectedOptions.put( + PATH.key(), + String.format( + "%s/%s/%s/%s", + tempFile.getAbsolutePath(), + tablePath.getDatabaseName(), + tablePath.getObjectName(), + DATA_PATH)); + assertThat(actualTable.getOptions()).isEqualTo(expectedOptions); + } + + @Test + public void testAlterCatalogMaterializedTable() throws Exception { + ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb2"); + // test create materialized table + catalog.createTable(tablePath, EXPECTED_CATALOG_MATERIALIZED_TABLE, true); + + // test materialized table exist + assertThat(catalog.tableExists(tablePath)).isTrue(); + + // alter materialized table refresh handler + ResolvedCatalogMaterializedTable updatedMaterializedTable = + EXPECTED_CATALOG_MATERIALIZED_TABLE.copy( + CatalogMaterializedTable.RefreshStatus.ACTIVATED, + REFRESH_HANDLER.asSummaryString(), + REFRESH_HANDLER.toBytes()); + catalog.alterTable(tablePath, updatedMaterializedTable, Collections.emptyList(), false); + + // test get materialized table + CatalogBaseTable actualTable = catalog.getTable(tablePath); + // validate table type + assertThat(actualTable.getTableKind()) + .isEqualTo(CatalogBaseTable.TableKind.MATERIALIZED_TABLE); + + CatalogMaterializedTable actualMaterializedTable = (CatalogMaterializedTable) actualTable; + // validate refresh status + assertThat(actualMaterializedTable.getRefreshStatus()) + .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED); + // validate refresh handler + assertThat(actualMaterializedTable.getRefreshHandlerDescription().get()) + .isEqualTo(REFRESH_HANDLER.asSummaryString()); + assertThat(actualMaterializedTable.getSerializedRefreshHandler()) + .isEqualTo(REFRESH_HANDLER.toBytes()); + } + + @Test + public void testDropTable() throws Exception { + ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb1"); + // create table + catalog.createTable(tablePath, EXPECTED_CATALOG_TABLE, true); + + // test drop table + catalog.dropTable(tablePath, true); + assertThat(catalog.tableExists(tablePath)).isFalse(); + + // drop non-exist table + assertThrows( + TableNotExistException.class, + () -> catalog.dropTable(new ObjectPath(TEST_DEFAULT_DATABASE, "non_exist"), false)); + } + + private static class TestRefreshHandler implements RefreshHandler { + + private final String handlerString; + + public TestRefreshHandler(String handlerString) { + this.handlerString = handlerString; + } + + @Override + public String asSummaryString() { + return "test refresh handler"; + } + + public byte[] toBytes() { + return handlerString.getBytes(); + } + } +} diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTestBase.java b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTestBase.java new file mode 100644 index 00000000000..8cd5fc306e6 --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/java/org/apache/flink/table/file/testutils/catalog/TestFileSystemCatalogTestBase.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.file.testutils.catalog; + +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; + +/** Base class for test filesystem catalog. */ +public abstract class TestFileSystemCatalogTestBase extends AbstractTestBase { + + protected static final String TEST_CATALOG = "test_catalog"; + protected static final String TEST_DEFAULT_DATABASE = "test_db"; + protected static final String NONE_EXIST_DATABASE = "none_exist_database"; + + protected TestFileSystemCatalog catalog; + + @TempDir File tempFile; + + @BeforeEach + void before() { + File testDb = new File(tempFile, TEST_DEFAULT_DATABASE); + testDb.mkdir(); + + String catalogPathStr = tempFile.getAbsolutePath(); + catalog = new TestFileSystemCatalog(catalogPathStr, TEST_CATALOG, TEST_DEFAULT_DATABASE); + catalog.open(); + } + + @AfterEach + void close() { + if (catalog != null) { + catalog.close(); + } + } +} diff --git a/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/resources/log4j2-test.properties b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/resources/log4j2-test.properties new file mode 100644 index 00000000000..8bb9fe60fd0 --- /dev/null +++ b/flink-test-utils-parent/flink-table-filesystem-test-utils/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=OFF +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n diff --git a/flink-test-utils-parent/pom.xml b/flink-test-utils-parent/pom.xml index 0c6a7dc08ce..96273501e20 100644 --- a/flink-test-utils-parent/pom.xml +++ b/flink-test-utils-parent/pom.xml @@ -40,6 +40,7 @@ under the License. <module>flink-connector-test-utils</module> <module>flink-clients-test-utils</module> <module>flink-migration-test-utils</module> + <module>flink-table-filesystem-test-utils</module> </modules> </project>