This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 54fd3ab2 [FLINK-30398] Introduce S3 support for table store
54fd3ab2 is described below

commit 54fd3ab2d16999d628faf4ff62c701b12ca20725
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Dec 14 15:43:33 2022 +0800

    [FLINK-30398] Introduce S3 support for table store
    
    This closes #436.
---
 docs/content/docs/filesystems/overview.md          |   3 +-
 docs/content/docs/filesystems/s3.md                |  70 ++++++++++++
 .../flink/table/store/codegen/CodeGenLoader.java   | 116 ++-----------------
 .../flink/table/store/filesystem/FileSystems.java  |  81 +++++++++++++
 .../table/store/plugin}/ComponentClassLoader.java  |   2 +-
 .../flink/table/store/plugin/FileSystemLoader.java |  27 +++++
 .../flink/table/store/plugin/PluginLoader.java     |  49 ++++----
 .../flink/table/store/codegen/CodeGenUtils.java    |  10 +-
 .../table/store/file/catalog/CatalogFactory.java   |  13 ++-
 .../flink-table-store-oss/pom.xml                  |  57 ++++++----
 .../apache/flink/table/store/oss/OSSLoader.java    |  48 ++++++++
 ...pache.flink.table.store.plugin.FileSystemLoader |  16 +++
 .../pom.xml                                        |  63 +++++++----
 .../org/apache/flink/table/store/s3/S3Loader.java  |  48 ++++++++
 ...pache.flink.table.store.plugin.FileSystemLoader |  16 +++
 flink-table-store-filesystems/pom.xml              |   1 +
 .../apache/flink/table/store/hive/HiveSchema.java  |   9 +-
 flink-table-store-spark/pom.xml                    |  31 +++++
 .../flink/table/store/spark/SparkCatalog.java      |   6 +-
 .../flink/table/store/spark/SparkSource.java       |   7 +-
 .../table/store/spark/MinioTestContainer.java      | 126 +++++++++++++++++++++
 .../flink/table/store/spark/SparkS3ITCase.java     |  71 ++++++++++++
 .../flink/table/store/spark/SparkSource.java       |   7 +-
 23 files changed, 671 insertions(+), 206 deletions(-)

diff --git a/docs/content/docs/filesystems/overview.md 
b/docs/content/docs/filesystems/overview.md
index 232801d7..82be82fb 100644
--- a/docs/content/docs/filesystems/overview.md
+++ b/docs/content/docs/filesystems/overview.md
@@ -39,4 +39,5 @@ FileSystem pluggable jars for user to query tables from 
Spark/Hive side.
 
|:------------------|:-----------------|-----------|:--------------------------------|
 | Local File System | file://          | N         | Built-in Support          
      |
 | HDFS              | hdfs://          | N         | Built-in Support, ensure 
that the cluster is in the hadoop environment |
-| Aliyun OSS        | oss://           | Y         | Tested on Spark3.3 and 
Hive 3.1 |
+| Aliyun OSS        | oss://           | Y         |  |
+| S3                | s3://            | Y         |  |
diff --git a/docs/content/docs/filesystems/s3.md 
b/docs/content/docs/filesystems/s3.md
new file mode 100644
index 00000000..d62148cb
--- /dev/null
+++ b/docs/content/docs/filesystems/s3.md
@@ -0,0 +1,70 @@
+---
+title: "S3"
+weight: 3
+type: docs
+aliases:
+- /filesystems/s3.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# S3
+
+## Build
+
+[Build Flink Table Store]({{< ref "docs/engines/build" >}}), you can find the 
shaded jars under
+`./flink-table-store-filesystems/flink-table-store-s3/target/flink-table-store-s3-{{<
 version >}}.jar`.
+
+## Use
+
+- On Flink side, configure `flink-conf.yaml` like
+    ```yaml
+    s3.endpoint: your-endpoint-hostname
+    s3.access-key: xxx
+    s3.secret-key: yyy
+    ```
+
+- On Spark side, place `flink-table-store-s3-{{< version >}}.jar` together 
with `flink-table-store-spark-{{< version >}}.jar` under Spark's jars 
directory, and start like
+  - Spark SQL
+    ```shell
+    spark-sql \ 
+      --conf 
spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
+      --conf spark.sql.catalog.tablestore.warehouse=s3://<bucket>/<endpoint> \
+      --conf spark.sql.catalog.tablestore.s3.endpoint=your-endpoint-hostname \
+      --conf spark.sql.catalog.tablestore.s3.access-key=xxx \
+      --conf spark.sql.catalog.tablestore.s3.secret-key=yyy
+    ```
+- On Hive side, place `flink-table-store-s3-{{< version >}}.jar` together with 
`flink-table-store-hive-connector-{{< version >}}.jar` under Hive's auxlib 
directory, and start like
+  - Hive Catalog
+    ```sql
+    SET tablestore.s3.endpoint=your-endpoint-hostname;
+    SET tablestore.s3.access-key=xxx;
+    SET tablestore.s3.secret-key=yyy;
+
+    CREATE EXTERNAL TABLE external_test_table
+    STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
+    LOCATION 's3://<bucket>/<endpoint>';
+    ```
+
+## Non-S3
+The S3 Filesystem also support using S3 compliant object stores such as IBM’s
+Cloud Object Storage and MinIO. To do so, configure your endpoint.
+  ```yaml
+  s3.endpoint: your-endpoint-hostname
+  ```
diff --git 
a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
 
b/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
index 6bba6a4c..bf0ca863 100644
--- 
a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
+++ 
b/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
@@ -18,124 +18,26 @@
 
 package org.apache.flink.table.store.codegen;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.table.store.utils.LocalFileUtils;
-import org.apache.flink.util.IOUtils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.ServiceLoader;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import org.apache.flink.table.store.plugin.PluginLoader;
 
 /** Copied and modified from the flink-table-planner-loader module. */
 public class CodeGenLoader {
 
-    static final String FLINK_TABLE_STORE_CODEGEN_FAT_JAR = 
"flink-table-store-codegen.jar";
-
-    public static final String[] PARENT_FIRST_LOGGING_PATTERNS =
-            new String[] {
-                "org.slf4j",
-                "org.apache.log4j",
-                "org.apache.logging",
-                "org.apache.commons.logging",
-                "ch.qos.logback"
-            };
-
-    private static final String[] OWNER_CLASSPATH =
-            Stream.concat(
-                            Arrays.stream(PARENT_FIRST_LOGGING_PATTERNS),
-                            Stream.of(
-                                    // These packages are shipped either by
-                                    // flink-table-runtime or flink-dist itself
-                                    "org.codehaus.janino",
-                                    "org.codehaus.commons",
-                                    "org.apache.commons.lang3"))
-                    .toArray(String[]::new);
-
-    private static final String[] COMPONENT_CLASSPATH = new String[] 
{"org.apache.flink"};
-
-    private final ClassLoader submoduleClassLoader;
-
-    private CodeGenLoader() {
-        try {
-            final ClassLoader flinkClassLoader = 
CodeGenLoader.class.getClassLoader();
-            final Path tmpDirectory =
-                    Paths.get(ConfigurationUtils.parseTempDirectories(new 
Configuration())[0]);
-            Files.createDirectories(
-                    
LocalFileUtils.getTargetPathIfContainsSymbolicPath(tmpDirectory));
-            Path delegateJar =
-                    extractResource(
-                            FLINK_TABLE_STORE_CODEGEN_FAT_JAR,
-                            flinkClassLoader,
-                            tmpDirectory,
-                            "Flink table store codegen could not be found.\n"
-                                    + "If you're running a test, please make 
sure you've built the codegen modules by running\n"
-                                    + "mvn clean package -pl 
flink-table-store-codegen,flink-table-store-codegen-loader -DskipTests");
-            this.submoduleClassLoader =
-                    new ComponentClassLoader(
-                            new URL[] {delegateJar.toUri().toURL()},
-                            flinkClassLoader,
-                            OWNER_CLASSPATH,
-                            COMPONENT_CLASSPATH);
-        } catch (IOException e) {
-            throw new RuntimeException(
-                    "Could not initialize the flink-table-store-codegen 
loader.", e);
-        }
-    }
-
-    private Path extractResource(
-            String resourceName,
-            ClassLoader flinkClassLoader,
-            Path tmpDirectory,
-            String errorMessage)
-            throws IOException {
-        String[] splitName = resourceName.split("\\.");
-        splitName[0] += "_" + UUID.randomUUID();
-        final Path tempFile = 
Files.createFile(tmpDirectory.resolve(String.join(".", splitName)));
-        final InputStream resourceStream = 
flinkClassLoader.getResourceAsStream(resourceName);
-        if (resourceStream == null) {
-            throw new RuntimeException(errorMessage);
-        }
-        IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile));
-        return tempFile;
-    }
+    private static final String FLINK_TABLE_STORE_CODEGEN_FAT_JAR = 
"flink-table-store-codegen.jar";
 
     // Singleton lazy initialization
 
-    private static CodeGenLoader instance;
+    private static PluginLoader loader;
 
-    public static synchronized CodeGenLoader getInstance() {
-        if (instance == null) {
+    private static synchronized PluginLoader getLoader() {
+        if (loader == null) {
             // Avoid NoClassDefFoundError without cause by exception
-            instance = new CodeGenLoader();
+            loader = new PluginLoader(FLINK_TABLE_STORE_CODEGEN_FAT_JAR);
         }
-        return instance;
+        return loader;
     }
 
-    public <T> T discover(Class<T> clazz) {
-        List<T> results = new ArrayList<>();
-        ServiceLoader.load(clazz, 
submoduleClassLoader).iterator().forEachRemaining(results::add);
-        if (results.size() != 1) {
-            throw new RuntimeException(
-                    "Found "
-                            + results.size()
-                            + " classes implementing "
-                            + clazz.getName()
-                            + ". They are:\n"
-                            + results.stream()
-                                    .map(t -> t.getClass().getName())
-                                    .collect(Collectors.joining("\n")));
-        }
-        return results.get(0);
+    public static CodeGenerator getCodeGenerator() {
+        return getLoader().discover(CodeGenerator.class);
     }
 }
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/filesystem/FileSystems.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/filesystem/FileSystems.java
new file mode 100644
index 00000000..cdc10ffe
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/filesystem/FileSystems.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.filesystem;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.runtime.security.SecurityConfiguration;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.table.store.plugin.FileSystemLoader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+
+/**
+ * A new {@link FileSystem} loader to support:
+ *
+ * <ul>
+ *   <li>Support access to file systems in Hive, Spark, Trino and other 
computing engines.
+ *   <li>TODO: Support access to multiple users' file systems in Flink 
clusters.
+ * </ul>
+ */
+public class FileSystems {
+
+    public static void initialize(Path path, Configuration configuration) {
+        // 1. Try to load file system
+        try {
+            // check can obtain
+            FileSystem fs = path.getFileSystem();
+            // check can read
+            fs.getFileStatus(path);
+            return;
+        } catch (IOException ignored) {
+        }
+
+        // 2. initialize
+        FileSystem.initialize(
+                configuration,
+                new PluginManager() {
+                    @Override
+                    public <P> Iterator<P> load(Class<P> service) {
+                        return (Iterator<P>) discoverFactories().iterator();
+                    }
+                });
+
+        try {
+            SecurityUtils.install(new SecurityConfiguration(configuration));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static List<FileSystemFactory> discoverFactories() {
+        List<FileSystemFactory> results = new ArrayList<>();
+        ServiceLoader.load(FileSystemLoader.class, 
FileSystemLoader.class.getClassLoader())
+                .iterator()
+                .forEachRemaining(loader -> results.add(loader.load()));
+        return results;
+    }
+}
diff --git 
a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/ComponentClassLoader.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/ComponentClassLoader.java
similarity index 99%
rename from 
flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/ComponentClassLoader.java
rename to 
flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/ComponentClassLoader.java
index 052eb5d2..3f43ca5c 100644
--- 
a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/ComponentClassLoader.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/ComponentClassLoader.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.codegen;
+package org.apache.flink.table.store.plugin;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.function.FunctionWithException;
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/FileSystemLoader.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/FileSystemLoader.java
new file mode 100644
index 00000000..c06b52ad
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/FileSystemLoader.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.plugin;
+
+import org.apache.flink.core.fs.FileSystemFactory;
+
+/** Loader to load {@link FileSystemFactory}. */
+public interface FileSystemLoader {
+
+    FileSystemFactory load();
+}
diff --git 
a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/PluginLoader.java
similarity index 79%
copy from 
flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
copy to 
flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/PluginLoader.java
index 6bba6a4c..bd2d24a7 100644
--- 
a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/PluginLoader.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.store.codegen;
+package org.apache.flink.table.store.plugin;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
@@ -37,10 +37,8 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
-/** Copied and modified from the flink-table-planner-loader module. */
-public class CodeGenLoader {
-
-    static final String FLINK_TABLE_STORE_CODEGEN_FAT_JAR = 
"flink-table-store-codegen.jar";
+/** Loader to load plugin jar. */
+public class PluginLoader {
 
     public static final String[] PARENT_FIRST_LOGGING_PATTERNS =
             new String[] {
@@ -66,25 +64,27 @@ public class CodeGenLoader {
 
     private final ClassLoader submoduleClassLoader;
 
-    private CodeGenLoader() {
+    public PluginLoader(String jarName) {
         try {
-            final ClassLoader flinkClassLoader = 
CodeGenLoader.class.getClassLoader();
-            final Path tmpDirectory =
+            ClassLoader ownerClassLoader = PluginLoader.class.getClassLoader();
+            Path tmpDirectory =
                     Paths.get(ConfigurationUtils.parseTempDirectories(new 
Configuration())[0]);
             Files.createDirectories(
                     
LocalFileUtils.getTargetPathIfContainsSymbolicPath(tmpDirectory));
             Path delegateJar =
                     extractResource(
-                            FLINK_TABLE_STORE_CODEGEN_FAT_JAR,
-                            flinkClassLoader,
+                            jarName,
+                            ownerClassLoader,
                             tmpDirectory,
-                            "Flink table store codegen could not be found.\n"
-                                    + "If you're running a test, please make 
sure you've built the codegen modules by running\n"
-                                    + "mvn clean package -pl 
flink-table-store-codegen,flink-table-store-codegen-loader -DskipTests");
+                            String.format(
+                                    "%s could not be found.\n"
+                                            + "If you're running a test, 
please make sure you've built the modules by running\n"
+                                            + "mvn clean install -DskipTests",
+                                    jarName));
             this.submoduleClassLoader =
                     new ComponentClassLoader(
                             new URL[] {delegateJar.toUri().toURL()},
-                            flinkClassLoader,
+                            ownerClassLoader,
                             OWNER_CLASSPATH,
                             COMPONENT_CLASSPATH);
         } catch (IOException e) {
@@ -110,18 +110,6 @@ public class CodeGenLoader {
         return tempFile;
     }
 
-    // Singleton lazy initialization
-
-    private static CodeGenLoader instance;
-
-    public static synchronized CodeGenLoader getInstance() {
-        if (instance == null) {
-            // Avoid NoClassDefFoundError without cause by exception
-            instance = new CodeGenLoader();
-        }
-        return instance;
-    }
-
     public <T> T discover(Class<T> clazz) {
         List<T> results = new ArrayList<>();
         ServiceLoader.load(clazz, 
submoduleClassLoader).iterator().forEachRemaining(results::add);
@@ -138,4 +126,13 @@ public class CodeGenLoader {
         }
         return results.get(0);
     }
+
+    @SuppressWarnings("unchecked")
+    public <T> T newInstance(String name) {
+        try {
+            return (T) submoduleClassLoader.loadClass(name).newInstance();
+        } catch (ClassNotFoundException | InstantiationException | 
IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java
index c8775012..a7175fe9 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java
@@ -34,25 +34,21 @@ public class CodeGenUtils {
             return EMPTY_PROJECTION;
         }
 
-        return CodeGenLoader.getInstance()
-                .discover(CodeGenerator.class)
+        return CodeGenLoader.getCodeGenerator()
                 .generateProjection("Projection", inputType, mapping)
                 .newInstance(CodeGenUtils.class.getClassLoader());
     }
 
     public static NormalizedKeyComputer newNormalizedKeyComputer(
             List<LogicalType> fieldTypes, String name) {
-        return CodeGenLoader.getInstance()
-                .discover(CodeGenerator.class)
+        return CodeGenLoader.getCodeGenerator()
                 .generateNormalizedKeyComputer(fieldTypes, name)
                 .newInstance(CodeGenUtils.class.getClassLoader());
     }
 
     public static GeneratedClass<RecordComparator> generateRecordComparator(
             List<LogicalType> fieldTypes, String name) {
-        return CodeGenLoader.getInstance()
-                .discover(CodeGenerator.class)
-                .generateRecordComparator(fieldTypes, name);
+        return 
CodeGenLoader.getCodeGenerator().generateRecordComparator(fieldTypes, name);
     }
 
     public static RecordComparator newRecordComparator(List<LogicalType> 
fieldTypes, String name) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
index de6c97ab..66129710 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
@@ -41,14 +41,19 @@ public interface CatalogFactory {
 
     Catalog create(String warehouse, Configuration options);
 
-    static Catalog createCatalog(Configuration options) {
-        // manual validation
-        // because different catalog types may have different options
-        // we can't list them all in the optionalOptions() method
+    static Path warehouse(Configuration options) {
         String warehouse =
                 Preconditions.checkNotNull(
                         options.get(WAREHOUSE),
                         "Table store '" + WAREHOUSE.key() + "' path must be 
set");
+        return new Path(warehouse);
+    }
+
+    static Catalog createCatalog(Configuration options) {
+        // manual validation
+        // because different catalog types may have different options
+        // we can't list them all in the optionalOptions() method
+        String warehouse = warehouse(options).toUri().toString();
 
         String metastore = options.get(METASTORE);
         List<CatalogFactory> factories = new ArrayList<>();
diff --git a/flink-table-store-filesystems/flink-table-store-oss/pom.xml 
b/flink-table-store-filesystems/flink-table-store-oss/pom.xml
index f695af25..e1724b2b 100644
--- a/flink-table-store-filesystems/flink-table-store-oss/pom.xml
+++ b/flink-table-store-filesystems/flink-table-store-oss/pom.xml
@@ -32,10 +32,32 @@
     <packaging>jar</packaging>
 
     <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-oss-fs-hadoop</artifactId>
             <version>${flink.version}</version>
+            <scope>runtime</scope>
+            <optional>true</optional>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 
@@ -43,31 +65,26 @@
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
+                <artifactId>maven-dependency-plugin</artifactId>
                 <executions>
                     <execution>
-                        <id>shade-oss</id>
-                        <phase>package</phase>
+                        <id>copy-oss-jar</id>
+                        <phase>prepare-package</phase>
                         <goals>
-                            <goal>shade</goal>
+                            <goal>copy</goal>
                         </goals>
                         <configuration>
-                            
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
-                            
<shadedArtifactAttached>false</shadedArtifactAttached>
-                            <relocations>
-                                <relocation>
-                                    <pattern>org.apache.commons.lang3</pattern>
-                                    
<shadedPattern>org.apache.flink.table.store.shaded.org.apache.commons.lang3</shadedPattern>
-                                </relocation>
-                                <relocation>
-                                    <pattern>com.google.common.base</pattern>
-                                    
<shadedPattern>org.apache.flink.table.store.shaded.com.google.common.base</shadedPattern>
-                                </relocation>
-                                <relocation>
-                                    <pattern>com.google.common.cache</pattern>
-                                    
<shadedPattern>org.apache.flink.table.store.shaded.com.google.common.cache</shadedPattern>
-                                </relocation>
-                            </relocations>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>org.apache.flink</groupId>
+                                    
<artifactId>flink-oss-fs-hadoop</artifactId>
+                                    <version>${flink.version}</version>
+                                    <type>jar</type>
+                                    <overWrite>true</overWrite>
+                                    
<destFileName>flink-table-store-plugin-oss.jar</destFileName>
+                                </artifactItem>
+                            </artifactItems>
+                            
<outputDirectory>${project.build.directory}/classes</outputDirectory>
                         </configuration>
                     </execution>
                 </executions>
diff --git 
a/flink-table-store-filesystems/flink-table-store-oss/src/main/java/org/apache/flink/table/store/oss/OSSLoader.java
 
b/flink-table-store-filesystems/flink-table-store-oss/src/main/java/org/apache/flink/table/store/oss/OSSLoader.java
new file mode 100644
index 00000000..cce5c0df
--- /dev/null
+++ 
b/flink-table-store-filesystems/flink-table-store-oss/src/main/java/org/apache/flink/table/store/oss/OSSLoader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.oss;
+
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.table.store.plugin.FileSystemLoader;
+import org.apache.flink.table.store.plugin.PluginLoader;
+
+/** A {@link PluginLoader} to load oss. */
+public class OSSLoader implements FileSystemLoader {
+
+    private static final String OSS_JAR = "flink-table-store-plugin-oss.jar";
+
+    private static final String OSS_CLASS = 
"org.apache.flink.fs.osshadoop.OSSFileSystemFactory";
+
+    // Singleton lazy initialization
+
+    private static PluginLoader loader;
+
+    private static synchronized PluginLoader getLoader() {
+        if (loader == null) {
+            // Avoid NoClassDefFoundError without cause by exception
+            loader = new PluginLoader(OSS_JAR);
+        }
+        return loader;
+    }
+
+    @Override
+    public FileSystemFactory load() {
+        return getLoader().newInstance(OSS_CLASS);
+    }
+}
diff --git 
a/flink-table-store-filesystems/flink-table-store-oss/src/main/resources/META-INF/services/org.apache.flink.table.store.plugin.FileSystemLoader
 
b/flink-table-store-filesystems/flink-table-store-oss/src/main/resources/META-INF/services/org.apache.flink.table.store.plugin.FileSystemLoader
new file mode 100644
index 00000000..ef76268b
--- /dev/null
+++ 
b/flink-table-store-filesystems/flink-table-store-oss/src/main/resources/META-INF/services/org.apache.flink.table.store.plugin.FileSystemLoader
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.oss.OSSLoader
\ No newline at end of file
diff --git a/flink-table-store-filesystems/flink-table-store-oss/pom.xml 
b/flink-table-store-filesystems/flink-table-store-s3/pom.xml
similarity index 51%
copy from flink-table-store-filesystems/flink-table-store-oss/pom.xml
copy to flink-table-store-filesystems/flink-table-store-s3/pom.xml
index f695af25..6543bd60 100644
--- a/flink-table-store-filesystems/flink-table-store-oss/pom.xml
+++ b/flink-table-store-filesystems/flink-table-store-s3/pom.xml
@@ -27,15 +27,37 @@
         <version>0.3-SNAPSHOT</version>
     </parent>
 
-    <artifactId>flink-table-store-oss</artifactId>
-    <name>Flink Table Store : FileSystems : OSS</name>
+    <artifactId>flink-table-store-s3</artifactId>
+    <name>Flink Table Store : FileSystems : S3</name>
     <packaging>jar</packaging>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>flink-oss-fs-hadoop</artifactId>
+            <artifactId>flink-core</artifactId>
             <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-s3-fs-hadoop</artifactId>
+            <version>${flink.version}</version>
+            <scope>runtime</scope>
+            <optional>true</optional>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
 
@@ -43,31 +65,26 @@
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
+                <artifactId>maven-dependency-plugin</artifactId>
                 <executions>
                     <execution>
-                        <id>shade-oss</id>
-                        <phase>package</phase>
+                        <id>copy-s3-jar</id>
+                        <phase>prepare-package</phase>
                         <goals>
-                            <goal>shade</goal>
+                            <goal>copy</goal>
                         </goals>
                         <configuration>
-                            
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
-                            
<shadedArtifactAttached>false</shadedArtifactAttached>
-                            <relocations>
-                                <relocation>
-                                    <pattern>org.apache.commons.lang3</pattern>
-                                    
<shadedPattern>org.apache.flink.table.store.shaded.org.apache.commons.lang3</shadedPattern>
-                                </relocation>
-                                <relocation>
-                                    <pattern>com.google.common.base</pattern>
-                                    
<shadedPattern>org.apache.flink.table.store.shaded.com.google.common.base</shadedPattern>
-                                </relocation>
-                                <relocation>
-                                    <pattern>com.google.common.cache</pattern>
-                                    
<shadedPattern>org.apache.flink.table.store.shaded.com.google.common.cache</shadedPattern>
-                                </relocation>
-                            </relocations>
+                            <artifactItems>
+                                <artifactItem>
+                                    <groupId>org.apache.flink</groupId>
+                                    <artifactId>flink-s3-fs-hadoop</artifactId>
+                                    <version>${flink.version}</version>
+                                    <type>jar</type>
+                                    <overWrite>true</overWrite>
+                                    
<destFileName>flink-table-store-plugin-s3.jar</destFileName>
+                                </artifactItem>
+                            </artifactItems>
+                            
<outputDirectory>${project.build.directory}/classes</outputDirectory>
                         </configuration>
                     </execution>
                 </executions>
diff --git 
a/flink-table-store-filesystems/flink-table-store-s3/src/main/java/org/apache/flink/table/store/s3/S3Loader.java
 
b/flink-table-store-filesystems/flink-table-store-s3/src/main/java/org/apache/flink/table/store/s3/S3Loader.java
new file mode 100644
index 00000000..31a1e2ea
--- /dev/null
+++ 
b/flink-table-store-filesystems/flink-table-store-s3/src/main/java/org/apache/flink/table/store/s3/S3Loader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.s3;
+
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.table.store.plugin.FileSystemLoader;
+import org.apache.flink.table.store.plugin.PluginLoader;
+
+/** A {@link PluginLoader} to load oss. */
+public class S3Loader implements FileSystemLoader {
+
+    private static final String S3_JAR = "flink-table-store-plugin-s3.jar";
+
+    private static final String S3_CLASS = 
"org.apache.flink.fs.s3hadoop.S3FileSystemFactory";
+
+    // Singleton lazy initialization
+
+    private static PluginLoader loader;
+
+    private static synchronized PluginLoader getLoader() {
+        if (loader == null) {
+            // Avoid NoClassDefFoundError without cause by exception
+            loader = new PluginLoader(S3_JAR);
+        }
+        return loader;
+    }
+
+    @Override
+    public FileSystemFactory load() {
+        return getLoader().newInstance(S3_CLASS);
+    }
+}
diff --git 
a/flink-table-store-filesystems/flink-table-store-s3/src/main/resources/META-INF/services/org.apache.flink.table.store.plugin.FileSystemLoader
 
b/flink-table-store-filesystems/flink-table-store-s3/src/main/resources/META-INF/services/org.apache.flink.table.store.plugin.FileSystemLoader
new file mode 100644
index 00000000..c3b4199e
--- /dev/null
+++ 
b/flink-table-store-filesystems/flink-table-store-s3/src/main/resources/META-INF/services/org.apache.flink.table.store.plugin.FileSystemLoader
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.s3.S3Loader
\ No newline at end of file
diff --git a/flink-table-store-filesystems/pom.xml 
b/flink-table-store-filesystems/pom.xml
index d2dc00da..9a7be61b 100644
--- a/flink-table-store-filesystems/pom.xml
+++ b/flink-table-store-filesystems/pom.xml
@@ -33,6 +33,7 @@
 
     <modules>
         <module>flink-table-store-oss</module>
+        <module>flink-table-store-s3</module>
     </modules>
 
 </project>
diff --git 
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
 
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
index 8a26404d..6b5ab2f5 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
@@ -18,12 +18,11 @@
 
 package org.apache.flink.table.store.hive;
 
-import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.table.store.file.schema.DataField;
 import org.apache.flink.table.store.file.schema.SchemaManager;
 import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.filesystem.FileSystems;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import org.apache.hadoop.conf.Configuration;
@@ -80,15 +79,15 @@ public class HiveSchema {
                             + ". Currently Flink table store only supports 
external table for Hive "
                             + "so location property must be set.");
         }
+        Path path = new Path(location);
         if (configuration != null) {
             org.apache.flink.configuration.Configuration flinkConf =
                     org.apache.flink.configuration.Configuration.fromMap(
                             getPropsWithPrefix(configuration, 
TABLE_STORE_PREFIX));
-            FileSystem.initialize(
-                    flinkConf, 
PluginUtils.createPluginManagerFromRootFolder(flinkConf));
+            FileSystems.initialize(path, flinkConf);
         }
         TableSchema tableSchema =
-                new SchemaManager(new Path(location))
+                new SchemaManager(path)
                         .latest()
                         .orElseThrow(
                                 () ->
diff --git a/flink-table-store-spark/pom.xml b/flink-table-store-spark/pom.xml
index 06cf79d3..db0c734a 100644
--- a/flink-table-store-spark/pom.xml
+++ b/flink-table-store-spark/pom.xml
@@ -35,6 +35,7 @@ under the License.
 
     <properties>
         <spark.version>3.2.2</spark.version>
+        <aws.version>1.12.319</aws.version>
     </properties>
 
     <dependencies>
@@ -60,6 +61,36 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <!-- Test -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>${flink.test.utils}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-store-s3</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-core</artifactId>
+            <version>${aws.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.amazonaws</groupId>
+            <artifactId>aws-java-sdk-s3</artifactId>
+            <version>${aws.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <!-- Activate these profiles with -Pspark-x.x to build and test against 
different Spark versions -->
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 234ddd6b..9712b709 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -19,14 +19,13 @@
 package org.apache.flink.table.store.spark;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.catalog.CatalogFactory;
 import org.apache.flink.table.store.file.operation.Lock;
 import org.apache.flink.table.store.file.schema.SchemaChange;
 import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.filesystem.FileSystems;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.Preconditions;
 
@@ -76,8 +75,7 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces {
         this.name = name;
         Configuration configuration =
                 
Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
-        FileSystem.initialize(
-                configuration, 
PluginUtils.createPluginManagerFromRootFolder(configuration));
+        FileSystems.initialize(CatalogFactory.warehouse(configuration), 
configuration);
         this.catalog = CatalogFactory.createCatalog(configuration);
     }
 
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index 8be455fb..67e3c7ec 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -19,9 +19,9 @@
 package org.apache.flink.table.store.spark;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.table.store.CoreOptions;
 import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.filesystem.FileSystems;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
 
 import org.apache.spark.sql.connector.catalog.SessionConfigSupport;
@@ -68,8 +68,7 @@ public class SparkSource implements DataSourceRegister, 
SessionConfigSupport {
             StructType schema, Transform[] partitioning, Map<String, String> 
options) {
         Configuration configuration =
                 
Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
-        FileSystem.initialize(
-                configuration, 
PluginUtils.createPluginManagerFromRootFolder(configuration));
+        FileSystems.initialize(CoreOptions.path(options), configuration);
         return new SparkTable(
                 FileStoreTableFactory.create(Configuration.fromMap(options)), 
Lock.emptyFactory());
     }
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/MinioTestContainer.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/MinioTestContainer.java
new file mode 100644
index 00000000..8c234985
--- /dev/null
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/MinioTestContainer.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.Base58;
+
+import java.time.Duration;
+import java.util.Locale;
+
+/** {@code MinioTestContainer} provides a {@code Minio} test instance. */
+public class MinioTestContainer extends GenericContainer<MinioTestContainer> {
+
+    private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint";
+
+    private static final int DEFAULT_PORT = 9000;
+
+    private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER";
+    private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD";
+
+    private static final String DEFAULT_STORAGE_DIRECTORY = "/data";
+    private static final String HEALTH_ENDPOINT = "/minio/health/ready";
+
+    private final String accessKey;
+    private final String secretKey;
+    private final String defaultBucketName;
+
+    public MinioTestContainer() {
+        this(randomString("bucket", 6));
+    }
+
+    public MinioTestContainer(String defaultBucketName) {
+        super(DockerImageVersions.MINIO);
+
+        this.accessKey = randomString("accessKey", 10);
+        // secrets must have at least 8 characters
+        this.secretKey = randomString("secret", 10);
+        this.defaultBucketName = Preconditions.checkNotNull(defaultBucketName);
+
+        withNetworkAliases(randomString("minio", 6));
+        addExposedPort(DEFAULT_PORT);
+        withEnv(MINIO_ACCESS_KEY, this.accessKey);
+        withEnv(MINIO_SECRET_KEY, this.secretKey);
+        withCommand("server", DEFAULT_STORAGE_DIRECTORY);
+        setWaitStrategy(
+                new HttpWaitStrategy()
+                        .forPort(DEFAULT_PORT)
+                        .forPath(HEALTH_ENDPOINT)
+                        .withStartupTimeout(Duration.ofMinutes(2)));
+    }
+
+    @Override
+    protected void containerIsStarted(InspectContainerResponse containerInfo) {
+        super.containerIsStarted(containerInfo);
+        createDefaultBucket();
+    }
+
+    private static String randomString(String prefix, int length) {
+        return String.format("%s-%s", prefix, 
Base58.randomString(length).toLowerCase(Locale.ROOT));
+    }
+
+    /** Creates {@link AmazonS3} client for accessing the {@code Minio} 
instance. */
+    private AmazonS3 getClient() {
+        return AmazonS3Client.builder()
+                .withCredentials(
+                        new AWSStaticCredentialsProvider(
+                                new BasicAWSCredentials(accessKey, secretKey)))
+                .withPathStyleAccessEnabled(true)
+                .withEndpointConfiguration(
+                        new AwsClientBuilder.EndpointConfiguration(
+                                getHttpEndpoint(), "unused-region"))
+                .build();
+    }
+
+    private String getHttpEndpoint() {
+        return String.format("http://%s:%s";, getHost(), 
getMappedPort(DEFAULT_PORT));
+    }
+
+    public Configuration getS3ConfigOptions() {
+        Configuration config = new Configuration();
+        config.setString(FLINK_CONFIG_S3_ENDPOINT, getHttpEndpoint());
+        config.setString("s3.path.style.access", "true");
+        config.setString("s3.access.key", accessKey);
+        config.setString("s3.secret.key", secretKey);
+        return config;
+    }
+
+    private void createDefaultBucket() {
+        getClient().createBucket(defaultBucketName);
+    }
+
+    /**
+     * Returns the S3 URI for the default bucket. This can be used to create 
the HA storage
+     * directory path.
+     */
+    public String getS3UriForDefaultBucket() {
+        return "s3://" + defaultBucketName;
+    }
+}
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
new file mode 100644
index 00000000..88af7f2e
--- /dev/null
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.core.fs.Path;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for using S3 in Spark. */
+public class SparkS3ITCase {
+
+    @ClassRule public static final MinioTestContainer MINIO_CONTAINER = new 
MinioTestContainer();
+
+    private static SparkSession spark = null;
+
+    @BeforeClass
+    public static void startMetastoreAndSpark() {
+        String path = MINIO_CONTAINER.getS3UriForDefaultBucket() + "/" + 
UUID.randomUUID();
+        Path warehousePath = new Path(path);
+        spark = SparkSession.builder().master("local[2]").getOrCreate();
+        spark.conf().set("spark.sql.catalog.tablestore", 
SparkCatalog.class.getName());
+        spark.conf().set("spark.sql.catalog.tablestore.warehouse", 
warehousePath.toString());
+        MINIO_CONTAINER
+                .getS3ConfigOptions()
+                .toMap()
+                .forEach((k, v) -> 
spark.conf().set("spark.sql.catalog.tablestore." + k, v));
+        spark.sql("CREATE DATABASE tablestore.db");
+        spark.sql("USE tablestore.db");
+    }
+
+    @AfterClass
+    public static void afterEach() {
+        spark.sql("DROP TABLE T");
+    }
+
+    @Test
+    public void testWriteRead() {
+        spark.sql(
+                "CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES"
+                        + " ('primary-key'='a', 'bucket'='4', 
'file.format'='avro')");
+        spark.sql("INSERT INTO T VALUES (1, 2, '3')").collectAsList();
+        List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+        assertThat(rows.toString()).isEqualTo("[[1,2,3]]");
+    }
+}
diff --git 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
 
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index 2050b63a..46685e48 100644
--- 
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++ 
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -19,8 +19,8 @@
 package org.apache.flink.table.store.spark;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.filesystem.FileSystems;
 import org.apache.flink.table.store.table.FileStoreTableFactory;
 
 import org.apache.spark.sql.sources.DataSourceRegister;
@@ -50,8 +50,7 @@ public class SparkSource implements DataSourceRegister, 
ReadSupport, SessionConf
     public DataSourceReader createReader(DataSourceOptions options) {
         Configuration configuration =
                 
Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
-        FileSystem.initialize(
-                configuration, 
PluginUtils.createPluginManagerFromRootFolder(configuration));
+        FileSystems.initialize(CoreOptions.path(configuration), configuration);
         return new 
SparkDataSourceReader(FileStoreTableFactory.create(configuration));
     }
 

Reply via email to