This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 7e0d55ff [FLINK-30453] Fix 'can't find CatalogFactory' error when
using FLINK sql-client to add table store bundle jar
7e0d55ff is described below
commit 7e0d55ff3dc9fd48455b17d9a439647b0554d020
Author: yuzelin <[email protected]>
AuthorDate: Tue Dec 20 19:40:13 2022 +0800
[FLINK-30453] Fix 'can't find CatalogFactory' error when using FLINK
sql-client to add table store bundle jar
This closes #442.
---
.../flink/table/store/connector/FlinkCatalogFactory.java | 12 +++++++++---
.../apache/flink/table/store/connector/FlinkCatalogTest.java | 4 +++-
.../flink/table/store/file/catalog/CatalogFactory.java | 10 +++++++++-
3 files changed, 21 insertions(+), 5 deletions(-)
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
index 9c70ffe9..43665476 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalogFactory.java
@@ -54,11 +54,17 @@ public class FlinkCatalogFactory implements
org.apache.flink.table.factories.Cat
@Override
public FlinkCatalog createCatalog(Context context) {
- return createCatalog(context.getName(),
Configuration.fromMap(context.getOptions()));
+ return createCatalog(
+ context.getName(),
+ Configuration.fromMap(context.getOptions()),
+ context.getClassLoader());
}
- public static FlinkCatalog createCatalog(String catalogName, Configuration
options) {
+ public static FlinkCatalog createCatalog(
+ String catalogName, Configuration options, ClassLoader
classLoader) {
return new FlinkCatalog(
- CatalogFactory.createCatalog(options), catalogName,
options.get(DEFAULT_DATABASE));
+ CatalogFactory.createCatalog(options, classLoader),
+ catalogName,
+ options.get(DEFAULT_DATABASE));
}
}
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
index e7a6d6fe..93271875 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FlinkCatalogTest.java
@@ -75,7 +75,9 @@ public class FlinkCatalogTest {
String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
Configuration conf = new Configuration();
conf.setString("warehouse", path);
- catalog = FlinkCatalogFactory.createCatalog("test-catalog", conf);
+ catalog =
+ FlinkCatalogFactory.createCatalog(
+ "test-catalog", conf,
FlinkCatalogTest.class.getClassLoader());
}
private ResolvedSchema createSchema() {
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
index 66129710..53a68e02 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/CatalogFactory.java
@@ -49,7 +49,15 @@ public interface CatalogFactory {
return new Path(warehouse);
}
+ /**
+ * If the ClassLoader is not specified, using the context ClassLoader of
current thread as
+ * default.
+ */
static Catalog createCatalog(Configuration options) {
+ return createCatalog(options,
Thread.currentThread().getContextClassLoader());
+ }
+
+ static Catalog createCatalog(Configuration options, ClassLoader
classLoader) {
// manual validation
// because different catalog types may have different options
// we can't list them all in the optionalOptions() method
@@ -57,7 +65,7 @@ public interface CatalogFactory {
String metastore = options.get(METASTORE);
List<CatalogFactory> factories = new ArrayList<>();
- ServiceLoader.load(CatalogFactory.class,
Thread.currentThread().getContextClassLoader())
+ ServiceLoader.load(CatalogFactory.class, classLoader)
.iterator()
.forEachRemaining(
f -> {