This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 54fd3ab2 [FLINK-30398] Introduce S3 support for table store
54fd3ab2 is described below
commit 54fd3ab2d16999d628faf4ff62c701b12ca20725
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Dec 14 15:43:33 2022 +0800
[FLINK-30398] Introduce S3 support for table store
This closes #436.
---
docs/content/docs/filesystems/overview.md | 3 +-
docs/content/docs/filesystems/s3.md | 70 ++++++++++++
.../flink/table/store/codegen/CodeGenLoader.java | 116 ++-----------------
.../flink/table/store/filesystem/FileSystems.java | 81 +++++++++++++
.../table/store/plugin}/ComponentClassLoader.java | 2 +-
.../flink/table/store/plugin/FileSystemLoader.java | 27 +++++
.../flink/table/store/plugin/PluginLoader.java | 49 ++++----
.../flink/table/store/codegen/CodeGenUtils.java | 10 +-
.../table/store/file/catalog/CatalogFactory.java | 13 ++-
.../flink-table-store-oss/pom.xml | 57 ++++++----
.../apache/flink/table/store/oss/OSSLoader.java | 48 ++++++++
...pache.flink.table.store.plugin.FileSystemLoader | 16 +++
.../pom.xml | 63 +++++++----
.../org/apache/flink/table/store/s3/S3Loader.java | 48 ++++++++
...pache.flink.table.store.plugin.FileSystemLoader | 16 +++
flink-table-store-filesystems/pom.xml | 1 +
.../apache/flink/table/store/hive/HiveSchema.java | 9 +-
flink-table-store-spark/pom.xml | 31 +++++
.../flink/table/store/spark/SparkCatalog.java | 6 +-
.../flink/table/store/spark/SparkSource.java | 7 +-
.../table/store/spark/MinioTestContainer.java | 126 +++++++++++++++++++++
.../flink/table/store/spark/SparkS3ITCase.java | 71 ++++++++++++
.../flink/table/store/spark/SparkSource.java | 7 +-
23 files changed, 671 insertions(+), 206 deletions(-)
diff --git a/docs/content/docs/filesystems/overview.md
b/docs/content/docs/filesystems/overview.md
index 232801d7..82be82fb 100644
--- a/docs/content/docs/filesystems/overview.md
+++ b/docs/content/docs/filesystems/overview.md
@@ -39,4 +39,5 @@ FileSystem pluggable jars for user to query tables from
Spark/Hive side.
|:------------------|:-----------------|-----------|:--------------------------------|
| Local File System | file:// | N | Built-in Support
|
| HDFS | hdfs:// | N | Built-in Support, ensure
that the cluster is in the hadoop environment |
-| Aliyun OSS | oss:// | Y | Tested on Spark3.3 and
Hive 3.1 |
+| Aliyun OSS | oss:// | Y | |
+| S3 | s3:// | Y | |
diff --git a/docs/content/docs/filesystems/s3.md
b/docs/content/docs/filesystems/s3.md
new file mode 100644
index 00000000..d62148cb
--- /dev/null
+++ b/docs/content/docs/filesystems/s3.md
@@ -0,0 +1,70 @@
+---
+title: "S3"
+weight: 3
+type: docs
+aliases:
+- /filesystems/s3.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# S3
+
+## Build
+
+[Build Flink Table Store]({{< ref "docs/engines/build" >}}), you can find the
shaded jars under
+`./flink-table-store-filesystems/flink-table-store-s3/target/flink-table-store-s3-{{<
version >}}.jar`.
+
+## Use
+
+- On Flink side, configure `flink-conf.yaml` like
+ ```yaml
+ s3.endpoint: your-endpoint-hostname
+ s3.access-key: xxx
+ s3.secret-key: yyy
+ ```
+
+- On Spark side, place `flink-table-store-s3-{{< version >}}.jar` together
with `flink-table-store-spark-{{< version >}}.jar` under Spark's jars
directory, and start like
+ - Spark SQL
+ ```shell
+ spark-sql \
+ --conf
spark.sql.catalog.tablestore=org.apache.flink.table.store.spark.SparkCatalog \
+ --conf spark.sql.catalog.tablestore.warehouse=s3://<bucket>/<endpoint> \
+ --conf spark.sql.catalog.tablestore.s3.endpoint=your-endpoint-hostname \
+ --conf spark.sql.catalog.tablestore.s3.access-key=xxx \
+ --conf spark.sql.catalog.tablestore.s3.secret-key=yyy
+ ```
+- On Hive side, place `flink-table-store-s3-{{< version >}}.jar` together with
`flink-table-store-hive-connector-{{< version >}}.jar` under Hive's auxlib
directory, and start like
+ - Hive Catalog
+ ```sql
+ SET tablestore.s3.endpoint=your-endpoint-hostname;
+ SET tablestore.s3.access-key=xxx;
+ SET tablestore.s3.secret-key=yyy;
+
+ CREATE EXTERNAL TABLE external_test_table
+ STORED BY 'org.apache.flink.table.store.hive.TableStoreHiveStorageHandler'
+ LOCATION 's3://<bucket>/<endpoint>';
+ ```
+
+## Non-S3
+The S3 Filesystem also support using S3 compliant object stores such as IBM’s
+Cloud Object Storage and MinIO. To do so, configure your endpoint.
+ ```yaml
+ s3.endpoint: your-endpoint-hostname
+ ```
diff --git
a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
b/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
index 6bba6a4c..bf0ca863 100644
---
a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
+++
b/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
@@ -18,124 +18,26 @@
package org.apache.flink.table.store.codegen;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ConfigurationUtils;
-import org.apache.flink.table.store.utils.LocalFileUtils;
-import org.apache.flink.util.IOUtils;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.ServiceLoader;
-import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import org.apache.flink.table.store.plugin.PluginLoader;
/** Copied and modified from the flink-table-planner-loader module. */
public class CodeGenLoader {
- static final String FLINK_TABLE_STORE_CODEGEN_FAT_JAR =
"flink-table-store-codegen.jar";
-
- public static final String[] PARENT_FIRST_LOGGING_PATTERNS =
- new String[] {
- "org.slf4j",
- "org.apache.log4j",
- "org.apache.logging",
- "org.apache.commons.logging",
- "ch.qos.logback"
- };
-
- private static final String[] OWNER_CLASSPATH =
- Stream.concat(
- Arrays.stream(PARENT_FIRST_LOGGING_PATTERNS),
- Stream.of(
- // These packages are shipped either by
- // flink-table-runtime or flink-dist itself
- "org.codehaus.janino",
- "org.codehaus.commons",
- "org.apache.commons.lang3"))
- .toArray(String[]::new);
-
- private static final String[] COMPONENT_CLASSPATH = new String[]
{"org.apache.flink"};
-
- private final ClassLoader submoduleClassLoader;
-
- private CodeGenLoader() {
- try {
- final ClassLoader flinkClassLoader =
CodeGenLoader.class.getClassLoader();
- final Path tmpDirectory =
- Paths.get(ConfigurationUtils.parseTempDirectories(new
Configuration())[0]);
- Files.createDirectories(
-
LocalFileUtils.getTargetPathIfContainsSymbolicPath(tmpDirectory));
- Path delegateJar =
- extractResource(
- FLINK_TABLE_STORE_CODEGEN_FAT_JAR,
- flinkClassLoader,
- tmpDirectory,
- "Flink table store codegen could not be found.\n"
- + "If you're running a test, please make
sure you've built the codegen modules by running\n"
- + "mvn clean package -pl
flink-table-store-codegen,flink-table-store-codegen-loader -DskipTests");
- this.submoduleClassLoader =
- new ComponentClassLoader(
- new URL[] {delegateJar.toUri().toURL()},
- flinkClassLoader,
- OWNER_CLASSPATH,
- COMPONENT_CLASSPATH);
- } catch (IOException e) {
- throw new RuntimeException(
- "Could not initialize the flink-table-store-codegen
loader.", e);
- }
- }
-
- private Path extractResource(
- String resourceName,
- ClassLoader flinkClassLoader,
- Path tmpDirectory,
- String errorMessage)
- throws IOException {
- String[] splitName = resourceName.split("\\.");
- splitName[0] += "_" + UUID.randomUUID();
- final Path tempFile =
Files.createFile(tmpDirectory.resolve(String.join(".", splitName)));
- final InputStream resourceStream =
flinkClassLoader.getResourceAsStream(resourceName);
- if (resourceStream == null) {
- throw new RuntimeException(errorMessage);
- }
- IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile));
- return tempFile;
- }
+ private static final String FLINK_TABLE_STORE_CODEGEN_FAT_JAR =
"flink-table-store-codegen.jar";
// Singleton lazy initialization
- private static CodeGenLoader instance;
+ private static PluginLoader loader;
- public static synchronized CodeGenLoader getInstance() {
- if (instance == null) {
+ private static synchronized PluginLoader getLoader() {
+ if (loader == null) {
// Avoid NoClassDefFoundError without cause by exception
- instance = new CodeGenLoader();
+ loader = new PluginLoader(FLINK_TABLE_STORE_CODEGEN_FAT_JAR);
}
- return instance;
+ return loader;
}
- public <T> T discover(Class<T> clazz) {
- List<T> results = new ArrayList<>();
- ServiceLoader.load(clazz,
submoduleClassLoader).iterator().forEachRemaining(results::add);
- if (results.size() != 1) {
- throw new RuntimeException(
- "Found "
- + results.size()
- + " classes implementing "
- + clazz.getName()
- + ". They are:\n"
- + results.stream()
- .map(t -> t.getClass().getName())
- .collect(Collectors.joining("\n")));
- }
- return results.get(0);
+ public static CodeGenerator getCodeGenerator() {
+ return getLoader().discover(CodeGenerator.class);
}
}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/filesystem/FileSystems.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/filesystem/FileSystems.java
new file mode 100644
index 00000000..cdc10ffe
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/filesystem/FileSystems.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.filesystem;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.runtime.security.SecurityConfiguration;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.table.store.plugin.FileSystemLoader;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ServiceLoader;
+
+/**
+ * A new {@link FileSystem} loader to support:
+ *
+ * <ul>
+ * <li>Support access to file systems in Hive, Spark, Trino and other
computing engines.
+ * <li>TODO: Support access to multiple users' file systems in Flink
clusters.
+ * </ul>
+ */
+public class FileSystems {
+
+ public static void initialize(Path path, Configuration configuration) {
+ // 1. Try to load file system
+ try {
+ // check can obtain
+ FileSystem fs = path.getFileSystem();
+ // check can read
+ fs.getFileStatus(path);
+ return;
+ } catch (IOException ignored) {
+ }
+
+ // 2. initialize
+ FileSystem.initialize(
+ configuration,
+ new PluginManager() {
+ @Override
+ public <P> Iterator<P> load(Class<P> service) {
+ return (Iterator<P>) discoverFactories().iterator();
+ }
+ });
+
+ try {
+ SecurityUtils.install(new SecurityConfiguration(configuration));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static List<FileSystemFactory> discoverFactories() {
+ List<FileSystemFactory> results = new ArrayList<>();
+ ServiceLoader.load(FileSystemLoader.class,
FileSystemLoader.class.getClassLoader())
+ .iterator()
+ .forEachRemaining(loader -> results.add(loader.load()));
+ return results;
+ }
+}
diff --git
a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/ComponentClassLoader.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/ComponentClassLoader.java
similarity index 99%
rename from
flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/ComponentClassLoader.java
rename to
flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/ComponentClassLoader.java
index 052eb5d2..3f43ca5c 100644
---
a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/ComponentClassLoader.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/ComponentClassLoader.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.codegen;
+package org.apache.flink.table.store.plugin;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.function.FunctionWithException;
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/FileSystemLoader.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/FileSystemLoader.java
new file mode 100644
index 00000000..c06b52ad
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/FileSystemLoader.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.plugin;
+
+import org.apache.flink.core.fs.FileSystemFactory;
+
+/** Loader to load {@link FileSystemFactory}. */
+public interface FileSystemLoader {
+
+ FileSystemFactory load();
+}
diff --git
a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/PluginLoader.java
similarity index 79%
copy from
flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
copy to
flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/PluginLoader.java
index 6bba6a4c..bd2d24a7 100644
---
a/flink-table-store-codegen-loader/src/main/java/org/apache/flink/table/store/codegen/CodeGenLoader.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/plugin/PluginLoader.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.codegen;
+package org.apache.flink.table.store.plugin;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
@@ -37,10 +37,8 @@ import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-/** Copied and modified from the flink-table-planner-loader module. */
-public class CodeGenLoader {
-
- static final String FLINK_TABLE_STORE_CODEGEN_FAT_JAR =
"flink-table-store-codegen.jar";
+/** Loader to load plugin jar. */
+public class PluginLoader {
public static final String[] PARENT_FIRST_LOGGING_PATTERNS =
new String[] {
@@ -66,25 +64,27 @@ public class CodeGenLoader {
private final ClassLoader submoduleClassLoader;
- private CodeGenLoader() {
+ public PluginLoader(String jarName) {
try {
- final ClassLoader flinkClassLoader =
CodeGenLoader.class.getClassLoader();
- final Path tmpDirectory =
+ ClassLoader ownerClassLoader = PluginLoader.class.getClassLoader();
+ Path tmpDirectory =
Paths.get(ConfigurationUtils.parseTempDirectories(new
Configuration())[0]);
Files.createDirectories(
LocalFileUtils.getTargetPathIfContainsSymbolicPath(tmpDirectory));
Path delegateJar =
extractResource(
- FLINK_TABLE_STORE_CODEGEN_FAT_JAR,
- flinkClassLoader,
+ jarName,
+ ownerClassLoader,
tmpDirectory,
- "Flink table store codegen could not be found.\n"
- + "If you're running a test, please make
sure you've built the codegen modules by running\n"
- + "mvn clean package -pl
flink-table-store-codegen,flink-table-store-codegen-loader -DskipTests");
+ String.format(
+ "%s could not be found.\n"
+ + "If you're running a test,
please make sure you've built the modules by running\n"
+ + "mvn clean install -DskipTests",
+ jarName));
this.submoduleClassLoader =
new ComponentClassLoader(
new URL[] {delegateJar.toUri().toURL()},
- flinkClassLoader,
+ ownerClassLoader,
OWNER_CLASSPATH,
COMPONENT_CLASSPATH);
} catch (IOException e) {
@@ -110,18 +110,6 @@ public class CodeGenLoader {
return tempFile;
}
- // Singleton lazy initialization
-
- private static CodeGenLoader instance;
-
- public static synchronized CodeGenLoader getInstance() {
- if (instance == null) {
- // Avoid NoClassDefFoundError without cause by exception
- instance = new CodeGenLoader();
- }
- return instance;
- }
-
public <T> T discover(Class<T> clazz) {
List<T> results = new ArrayList<>();
ServiceLoader.load(clazz,
submoduleClassLoader).iterator().forEachRemaining(results::add);
@@ -138,4 +126,13 @@ public class CodeGenLoader {
}
return results.get(0);
}
+
+ @SuppressWarnings("unchecked")
+ public <T> T newInstance(String name) {
+ try {
+ return (T) submoduleClassLoader.loadClass(name).newInstance();
+ } catch (ClassNotFoundException | InstantiationException |
IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java
index c8775012..a7175fe9 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/codegen/CodeGenUtils.java
@@ -34,25 +34,21 @@ public class CodeGenUtils {
return EMPTY_PROJECTION;
}
- return CodeGenLoader.getInstance()
- .discover(CodeGenerator.class)
+ return CodeGenLoader.getCodeGenerator()
.generateProjection("Projection", inputType, mapping)
.newInstance(CodeGenUtils.class.getClassLoader());
}
public static NormalizedKeyComputer newNormalizedKeyComputer(
List<LogicalType> fieldTypes, String name) {
- return CodeGenLoader.getInstance()
- .discover(CodeGenerator.class)
+ return CodeGenLoader.getCodeGenerator()
.generateNormalizedKeyComputer(fieldTypes, name)
.newInstance(CodeGenUtils.class.getClassLoader());
}
public static GeneratedClass<RecordComparator> generateRecordComparator(
List<LogicalType> fieldTypes, String name) {
- return CodeGenLoader.getInstance()
- .discover(CodeGenerator.class)
- .generateRecordComparator(fieldTypes, name);
+ return
CodeGenLoader.getCodeGenerator().generateRecordComparator(fieldTypes, name);
}
public static RecordComparator newRecordComparator(List<LogicalType>
fieldTypes, String name) {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
index de6c97ab..66129710 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
@@ -41,14 +41,19 @@ public interface CatalogFactory {
Catalog create(String warehouse, Configuration options);
- static Catalog createCatalog(Configuration options) {
- // manual validation
- // because different catalog types may have different options
- // we can't list them all in the optionalOptions() method
+ static Path warehouse(Configuration options) {
String warehouse =
Preconditions.checkNotNull(
options.get(WAREHOUSE),
"Table store '" + WAREHOUSE.key() + "' path must be
set");
+ return new Path(warehouse);
+ }
+
+ static Catalog createCatalog(Configuration options) {
+ // manual validation
+ // because different catalog types may have different options
+ // we can't list them all in the optionalOptions() method
+ String warehouse = warehouse(options).toUri().toString();
String metastore = options.get(METASTORE);
List<CatalogFactory> factories = new ArrayList<>();
diff --git a/flink-table-store-filesystems/flink-table-store-oss/pom.xml
b/flink-table-store-filesystems/flink-table-store-oss/pom.xml
index f695af25..e1724b2b 100644
--- a/flink-table-store-filesystems/flink-table-store-oss/pom.xml
+++ b/flink-table-store-filesystems/flink-table-store-oss/pom.xml
@@ -32,10 +32,32 @@
<packaging>jar</packaging>
<dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-oss-fs-hadoop</artifactId>
<version>${flink.version}</version>
+ <scope>runtime</scope>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
@@ -43,31 +65,26 @@
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
+ <artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
- <id>shade-oss</id>
- <phase>package</phase>
+ <id>copy-oss-jar</id>
+ <phase>prepare-package</phase>
<goals>
- <goal>shade</goal>
+ <goal>copy</goal>
</goals>
<configuration>
-
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
-
<shadedArtifactAttached>false</shadedArtifactAttached>
- <relocations>
- <relocation>
- <pattern>org.apache.commons.lang3</pattern>
-
<shadedPattern>org.apache.flink.table.store.shaded.org.apache.commons.lang3</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.google.common.base</pattern>
-
<shadedPattern>org.apache.flink.table.store.shaded.com.google.common.base</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.google.common.cache</pattern>
-
<shadedPattern>org.apache.flink.table.store.shaded.com.google.common.cache</shadedPattern>
- </relocation>
- </relocations>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-oss-fs-hadoop</artifactId>
+ <version>${flink.version}</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+
<destFileName>flink-table-store-plugin-oss.jar</destFileName>
+ </artifactItem>
+ </artifactItems>
+
<outputDirectory>${project.build.directory}/classes</outputDirectory>
</configuration>
</execution>
</executions>
diff --git
a/flink-table-store-filesystems/flink-table-store-oss/src/main/java/org/apache/flink/table/store/oss/OSSLoader.java
b/flink-table-store-filesystems/flink-table-store-oss/src/main/java/org/apache/flink/table/store/oss/OSSLoader.java
new file mode 100644
index 00000000..cce5c0df
--- /dev/null
+++
b/flink-table-store-filesystems/flink-table-store-oss/src/main/java/org/apache/flink/table/store/oss/OSSLoader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.oss;
+
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.table.store.plugin.FileSystemLoader;
+import org.apache.flink.table.store.plugin.PluginLoader;
+
+/** A {@link PluginLoader} to load oss. */
+public class OSSLoader implements FileSystemLoader {
+
+ private static final String OSS_JAR = "flink-table-store-plugin-oss.jar";
+
+ private static final String OSS_CLASS =
"org.apache.flink.fs.osshadoop.OSSFileSystemFactory";
+
+ // Singleton lazy initialization
+
+ private static PluginLoader loader;
+
+ private static synchronized PluginLoader getLoader() {
+ if (loader == null) {
+ // Avoid NoClassDefFoundError without cause by exception
+ loader = new PluginLoader(OSS_JAR);
+ }
+ return loader;
+ }
+
+ @Override
+ public FileSystemFactory load() {
+ return getLoader().newInstance(OSS_CLASS);
+ }
+}
diff --git
a/flink-table-store-filesystems/flink-table-store-oss/src/main/resources/META-INF/services/org.apache.flink.table.store.plugin.FileSystemLoader
b/flink-table-store-filesystems/flink-table-store-oss/src/main/resources/META-INF/services/org.apache.flink.table.store.plugin.FileSystemLoader
new file mode 100644
index 00000000..ef76268b
--- /dev/null
+++
b/flink-table-store-filesystems/flink-table-store-oss/src/main/resources/META-INF/services/org.apache.flink.table.store.plugin.FileSystemLoader
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.oss.OSSLoader
\ No newline at end of file
diff --git a/flink-table-store-filesystems/flink-table-store-oss/pom.xml
b/flink-table-store-filesystems/flink-table-store-s3/pom.xml
similarity index 51%
copy from flink-table-store-filesystems/flink-table-store-oss/pom.xml
copy to flink-table-store-filesystems/flink-table-store-s3/pom.xml
index f695af25..6543bd60 100644
--- a/flink-table-store-filesystems/flink-table-store-oss/pom.xml
+++ b/flink-table-store-filesystems/flink-table-store-s3/pom.xml
@@ -27,15 +27,37 @@
<version>0.3-SNAPSHOT</version>
</parent>
- <artifactId>flink-table-store-oss</artifactId>
- <name>Flink Table Store : FileSystems : OSS</name>
+ <artifactId>flink-table-store-s3</artifactId>
+ <name>Flink Table Store : FileSystems : S3</name>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>flink-oss-fs-hadoop</artifactId>
+ <artifactId>flink-core</artifactId>
<version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-s3-fs-hadoop</artifactId>
+ <version>${flink.version}</version>
+ <scope>runtime</scope>
+ <optional>true</optional>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
@@ -43,31 +65,26 @@
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
+ <artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
- <id>shade-oss</id>
- <phase>package</phase>
+ <id>copy-s3-jar</id>
+ <phase>prepare-package</phase>
<goals>
- <goal>shade</goal>
+ <goal>copy</goal>
</goals>
<configuration>
-
<dependencyReducedPomLocation>${project.basedir}/target/dependency-reduced-pom.xml</dependencyReducedPomLocation>
-
<shadedArtifactAttached>false</shadedArtifactAttached>
- <relocations>
- <relocation>
- <pattern>org.apache.commons.lang3</pattern>
-
<shadedPattern>org.apache.flink.table.store.shaded.org.apache.commons.lang3</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.google.common.base</pattern>
-
<shadedPattern>org.apache.flink.table.store.shaded.com.google.common.base</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.google.common.cache</pattern>
-
<shadedPattern>org.apache.flink.table.store.shaded.com.google.common.cache</shadedPattern>
- </relocation>
- </relocations>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-s3-fs-hadoop</artifactId>
+ <version>${flink.version}</version>
+ <type>jar</type>
+ <overWrite>true</overWrite>
+
<destFileName>flink-table-store-plugin-s3.jar</destFileName>
+ </artifactItem>
+ </artifactItems>
+
<outputDirectory>${project.build.directory}/classes</outputDirectory>
</configuration>
</execution>
</executions>
diff --git
a/flink-table-store-filesystems/flink-table-store-s3/src/main/java/org/apache/flink/table/store/s3/S3Loader.java
b/flink-table-store-filesystems/flink-table-store-s3/src/main/java/org/apache/flink/table/store/s3/S3Loader.java
new file mode 100644
index 00000000..31a1e2ea
--- /dev/null
+++
b/flink-table-store-filesystems/flink-table-store-s3/src/main/java/org/apache/flink/table/store/s3/S3Loader.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.s3;
+
+import org.apache.flink.core.fs.FileSystemFactory;
+import org.apache.flink.table.store.plugin.FileSystemLoader;
+import org.apache.flink.table.store.plugin.PluginLoader;
+
+/** A {@link PluginLoader} to load oss. */
+public class S3Loader implements FileSystemLoader {
+
+ private static final String S3_JAR = "flink-table-store-plugin-s3.jar";
+
+ private static final String S3_CLASS =
"org.apache.flink.fs.s3hadoop.S3FileSystemFactory";
+
+ // Singleton lazy initialization
+
+ private static PluginLoader loader;
+
+ private static synchronized PluginLoader getLoader() {
+ if (loader == null) {
+ // Avoid NoClassDefFoundError without cause by exception
+ loader = new PluginLoader(S3_JAR);
+ }
+ return loader;
+ }
+
+ @Override
+ public FileSystemFactory load() {
+ return getLoader().newInstance(S3_CLASS);
+ }
+}
diff --git
a/flink-table-store-filesystems/flink-table-store-s3/src/main/resources/META-INF/services/org.apache.flink.table.store.plugin.FileSystemLoader
b/flink-table-store-filesystems/flink-table-store-s3/src/main/resources/META-INF/services/org.apache.flink.table.store.plugin.FileSystemLoader
new file mode 100644
index 00000000..c3b4199e
--- /dev/null
+++
b/flink-table-store-filesystems/flink-table-store-s3/src/main/resources/META-INF/services/org.apache.flink.table.store.plugin.FileSystemLoader
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.table.store.s3.S3Loader
\ No newline at end of file
diff --git a/flink-table-store-filesystems/pom.xml
b/flink-table-store-filesystems/pom.xml
index d2dc00da..9a7be61b 100644
--- a/flink-table-store-filesystems/pom.xml
+++ b/flink-table-store-filesystems/pom.xml
@@ -33,6 +33,7 @@
<modules>
<module>flink-table-store-oss</module>
+ <module>flink-table-store-s3</module>
</modules>
</project>
diff --git
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
index 8a26404d..6b5ab2f5 100644
---
a/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
+++
b/flink-table-store-hive/flink-table-store-hive-connector/src/main/java/org/apache/flink/table/store/hive/HiveSchema.java
@@ -18,12 +18,11 @@
package org.apache.flink.table.store.hive;
-import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.table.store.file.schema.DataField;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.filesystem.FileSystems;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.hadoop.conf.Configuration;
@@ -80,15 +79,15 @@ public class HiveSchema {
+ ". Currently Flink table store only supports
external table for Hive "
+ "so location property must be set.");
}
+ Path path = new Path(location);
if (configuration != null) {
org.apache.flink.configuration.Configuration flinkConf =
org.apache.flink.configuration.Configuration.fromMap(
getPropsWithPrefix(configuration,
TABLE_STORE_PREFIX));
- FileSystem.initialize(
- flinkConf,
PluginUtils.createPluginManagerFromRootFolder(flinkConf));
+ FileSystems.initialize(path, flinkConf);
}
TableSchema tableSchema =
- new SchemaManager(new Path(location))
+ new SchemaManager(path)
.latest()
.orElseThrow(
() ->
diff --git a/flink-table-store-spark/pom.xml b/flink-table-store-spark/pom.xml
index 06cf79d3..db0c734a 100644
--- a/flink-table-store-spark/pom.xml
+++ b/flink-table-store-spark/pom.xml
@@ -35,6 +35,7 @@ under the License.
<properties>
<spark.version>3.2.2</spark.version>
+ <aws.version>1.12.319</aws.version>
</properties>
<dependencies>
@@ -60,6 +61,36 @@ under the License.
</exclusion>
</exclusions>
</dependency>
+
+ <!-- Test -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>${flink.test.utils}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-store-s3</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-core</artifactId>
+ <version>${aws.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-s3</artifactId>
+ <version>${aws.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<!-- Activate these profiles with -Pspark-x.x to build and test against
different Spark versions -->
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 234ddd6b..9712b709 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -19,14 +19,13 @@
package org.apache.flink.table.store.spark;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.file.catalog.Catalog;
import org.apache.flink.table.store.file.catalog.CatalogFactory;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.file.schema.SchemaChange;
import org.apache.flink.table.store.file.schema.UpdateSchema;
+import org.apache.flink.table.store.filesystem.FileSystems;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
@@ -76,8 +75,7 @@ public class SparkCatalog implements TableCatalog,
SupportsNamespaces {
this.name = name;
Configuration configuration =
Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
- FileSystem.initialize(
- configuration,
PluginUtils.createPluginManagerFromRootFolder(configuration));
+ FileSystems.initialize(CatalogFactory.warehouse(configuration),
configuration);
this.catalog = CatalogFactory.createCatalog(configuration);
}
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index 8be455fb..67e3c7ec 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -19,9 +19,9 @@
package org.apache.flink.table.store.spark;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.operation.Lock;
+import org.apache.flink.table.store.filesystem.FileSystems;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.spark.sql.connector.catalog.SessionConfigSupport;
@@ -68,8 +68,7 @@ public class SparkSource implements DataSourceRegister,
SessionConfigSupport {
StructType schema, Transform[] partitioning, Map<String, String>
options) {
Configuration configuration =
Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
- FileSystem.initialize(
- configuration,
PluginUtils.createPluginManagerFromRootFolder(configuration));
+ FileSystems.initialize(CoreOptions.path(options), configuration);
return new SparkTable(
FileStoreTableFactory.create(Configuration.fromMap(options)),
Lock.emptyFactory());
}
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/MinioTestContainer.java
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/MinioTestContainer.java
new file mode 100644
index 00000000..8c234985
--- /dev/null
+++
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/MinioTestContainer.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.DockerImageVersions;
+import org.apache.flink.util.Preconditions;
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.client.builder.AwsClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.utility.Base58;
+
+import java.time.Duration;
+import java.util.Locale;
+
+/** {@code MinioTestContainer} provides a {@code Minio} test instance. */
+public class MinioTestContainer extends GenericContainer<MinioTestContainer> {
+
+ private static final String FLINK_CONFIG_S3_ENDPOINT = "s3.endpoint";
+
+ private static final int DEFAULT_PORT = 9000;
+
+ private static final String MINIO_ACCESS_KEY = "MINIO_ROOT_USER";
+ private static final String MINIO_SECRET_KEY = "MINIO_ROOT_PASSWORD";
+
+ private static final String DEFAULT_STORAGE_DIRECTORY = "/data";
+ private static final String HEALTH_ENDPOINT = "/minio/health/ready";
+
+ private final String accessKey;
+ private final String secretKey;
+ private final String defaultBucketName;
+
+ public MinioTestContainer() {
+ this(randomString("bucket", 6));
+ }
+
+ public MinioTestContainer(String defaultBucketName) {
+ super(DockerImageVersions.MINIO);
+
+ this.accessKey = randomString("accessKey", 10);
+ // secrets must have at least 8 characters
+ this.secretKey = randomString("secret", 10);
+ this.defaultBucketName = Preconditions.checkNotNull(defaultBucketName);
+
+ withNetworkAliases(randomString("minio", 6));
+ addExposedPort(DEFAULT_PORT);
+ withEnv(MINIO_ACCESS_KEY, this.accessKey);
+ withEnv(MINIO_SECRET_KEY, this.secretKey);
+ withCommand("server", DEFAULT_STORAGE_DIRECTORY);
+ setWaitStrategy(
+ new HttpWaitStrategy()
+ .forPort(DEFAULT_PORT)
+ .forPath(HEALTH_ENDPOINT)
+ .withStartupTimeout(Duration.ofMinutes(2)));
+ }
+
+ @Override
+ protected void containerIsStarted(InspectContainerResponse containerInfo) {
+ super.containerIsStarted(containerInfo);
+ createDefaultBucket();
+ }
+
+ private static String randomString(String prefix, int length) {
+ return String.format("%s-%s", prefix,
Base58.randomString(length).toLowerCase(Locale.ROOT));
+ }
+
+ /** Creates {@link AmazonS3} client for accessing the {@code Minio}
instance. */
+ private AmazonS3 getClient() {
+ return AmazonS3Client.builder()
+ .withCredentials(
+ new AWSStaticCredentialsProvider(
+ new BasicAWSCredentials(accessKey, secretKey)))
+ .withPathStyleAccessEnabled(true)
+ .withEndpointConfiguration(
+ new AwsClientBuilder.EndpointConfiguration(
+ getHttpEndpoint(), "unused-region"))
+ .build();
+ }
+
+ private String getHttpEndpoint() {
+ return String.format("http://%s:%s", getHost(),
getMappedPort(DEFAULT_PORT));
+ }
+
+ public Configuration getS3ConfigOptions() {
+ Configuration config = new Configuration();
+ config.setString(FLINK_CONFIG_S3_ENDPOINT, getHttpEndpoint());
+ config.setString("s3.path.style.access", "true");
+ config.setString("s3.access.key", accessKey);
+ config.setString("s3.secret.key", secretKey);
+ return config;
+ }
+
+ private void createDefaultBucket() {
+ getClient().createBucket(defaultBucketName);
+ }
+
+ /**
+ * Returns the S3 URI for the default bucket. This can be used to create
the HA storage
+ * directory path.
+ */
+ public String getS3UriForDefaultBucket() {
+ return "s3://" + defaultBucketName;
+ }
+}
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
new file mode 100644
index 00000000..88af7f2e
--- /dev/null
+++
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkS3ITCase.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.spark;
+
+import org.apache.flink.core.fs.Path;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for using S3 in Spark. */
+public class SparkS3ITCase {
+
+ @ClassRule public static final MinioTestContainer MINIO_CONTAINER = new
MinioTestContainer();
+
+ private static SparkSession spark = null;
+
+ @BeforeClass
+ public static void startMetastoreAndSpark() {
+ String path = MINIO_CONTAINER.getS3UriForDefaultBucket() + "/" +
UUID.randomUUID();
+ Path warehousePath = new Path(path);
+ spark = SparkSession.builder().master("local[2]").getOrCreate();
+ spark.conf().set("spark.sql.catalog.tablestore",
SparkCatalog.class.getName());
+ spark.conf().set("spark.sql.catalog.tablestore.warehouse",
warehousePath.toString());
+ MINIO_CONTAINER
+ .getS3ConfigOptions()
+ .toMap()
+ .forEach((k, v) ->
spark.conf().set("spark.sql.catalog.tablestore." + k, v));
+ spark.sql("CREATE DATABASE tablestore.db");
+ spark.sql("USE tablestore.db");
+ }
+
+ @AfterClass
+ public static void afterEach() {
+ spark.sql("DROP TABLE T");
+ }
+
+ @Test
+ public void testWriteRead() {
+ spark.sql(
+ "CREATE TABLE T (a INT, b INT, c STRING) TBLPROPERTIES"
+ + " ('primary-key'='a', 'bucket'='4',
'file.format'='avro')");
+ spark.sql("INSERT INTO T VALUES (1, 2, '3')").collectAsList();
+ List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).isEqualTo("[[1,2,3]]");
+ }
+}
diff --git
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
index 2050b63a..46685e48 100644
---
a/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
+++
b/flink-table-store-spark2/src/main/java/org/apache/flink/table/store/spark/SparkSource.java
@@ -19,8 +19,8 @@
package org.apache.flink.table.store.spark;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.filesystem.FileSystems;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.spark.sql.sources.DataSourceRegister;
@@ -50,8 +50,7 @@ public class SparkSource implements DataSourceRegister,
ReadSupport, SessionConf
public DataSourceReader createReader(DataSourceOptions options) {
Configuration configuration =
Configuration.fromMap(SparkCaseSensitiveConverter.convert(options));
- FileSystem.initialize(
- configuration,
PluginUtils.createPluginManagerFromRootFolder(configuration));
+ FileSystems.initialize(CoreOptions.path(configuration), configuration);
return new
SparkDataSourceReader(FileStoreTableFactory.create(configuration));
}