This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-0.8 by this push:
new 4067c47ed [#6309] fix(flink): remove paimon package from
GravitinoPaimonCatalogFactory (#6314)
4067c47ed is described below
commit 4067c47ed9c67bc6ce1c310990c1d778c0b1c82e
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jan 17 16:39:34 2025 +0800
[#6309] fix(flink): remove paimon package from
GravitinoPaimonCatalogFactory (#6314)
### What changes were proposed in this pull request?
remove paimon package from `GravitinoPaimonCatalogFactory`, because
Paimon package may not in the Flink classpath when loading catalog
factories.
### Why are the changes needed?
Fix: #6309
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
tested in local flink cluster
Co-authored-by: FANNG <[email protected]>
---
.../flink/connector/paimon/GravitinoPaimonCatalog.java | 11 +++++++----
.../flink/connector/paimon/GravitinoPaimonCatalogFactory.java | 11 +++++++----
.../paimon/GravitinoPaimonCatalogFactoryOptions.java | 9 +++++++++
3 files changed, 23 insertions(+), 8 deletions(-)
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
index c22e00fa1..06107b862 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java
@@ -24,11 +24,13 @@ import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;
+import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.FlinkTableFactory;
/**
@@ -40,12 +42,13 @@ public class GravitinoPaimonCatalog extends BaseCatalog {
private final AbstractCatalog paimonCatalog;
protected GravitinoPaimonCatalog(
- String catalogName,
- AbstractCatalog paimonCatalog,
+ CatalogFactory.Context context,
+ String defaultDatabase,
PropertiesConverter propertiesConverter,
PartitionConverter partitionConverter) {
- super(catalogName, paimonCatalog.getDefaultDatabase(),
propertiesConverter, partitionConverter);
- this.paimonCatalog = paimonCatalog;
+ super(context.getName(), defaultDatabase, propertiesConverter,
partitionConverter);
+ FlinkCatalogFactory flinkCatalogFactory = new FlinkCatalogFactory();
+ this.paimonCatalog = flinkCatalogFactory.createCatalog(context);
}
@Override
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java
index 52489fc66..8732ade23 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java
@@ -23,12 +23,12 @@ import java.util.Collections;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.factories.FactoryUtil;
import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
-import org.apache.paimon.flink.FlinkCatalog;
-import org.apache.paimon.flink.FlinkCatalogFactory;
+import org.apache.gravitino.flink.connector.utils.FactoryUtils;
/**
* Factory for creating instances of {@link GravitinoPaimonCatalog}. It will
be created by SPI
@@ -38,9 +38,12 @@ public class GravitinoPaimonCatalogFactory implements
BaseCatalogFactory {
@Override
public Catalog createCatalog(Context context) {
- FlinkCatalog catalog = new FlinkCatalogFactory().createCatalog(context);
+ final FactoryUtil.CatalogFactoryHelper helper =
+ FactoryUtils.createCatalogFactoryHelper(this, context);
+ String defaultDatabase =
+
helper.getOptions().get(GravitinoPaimonCatalogFactoryOptions.DEFAULT_DATABASE);
return new GravitinoPaimonCatalog(
- context.getName(), catalog, propertiesConverter(),
partitionConverter());
+ context, defaultDatabase, propertiesConverter(), partitionConverter());
}
@Override
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java
index dd78f96d2..a4180b9eb 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactoryOptions.java
@@ -19,8 +19,17 @@
package org.apache.gravitino.flink.connector.paimon;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.paimon.flink.FlinkCatalogOptions;
+
public class GravitinoPaimonCatalogFactoryOptions {
/** Identifier for the {@link GravitinoPaimonCatalog}. */
public static final String IDENTIFIER = "gravitino-paimon";
+
+ public static final ConfigOption<String> DEFAULT_DATABASE =
+ ConfigOptions.key(FlinkCatalogOptions.DEFAULT_DATABASE.key())
+ .stringType()
+ .defaultValue(FlinkCatalogOptions.DEFAULT_DATABASE.defaultValue());
}