This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 58beebec9 [flink] Support custom initial flink catalog & env in
ActionBase (#3152)
58beebec9 is described below
commit 58beebec9646723ce2615fda11493591d1128ecc
Author: melin <[email protected]>
AuthorDate: Sun Apr 7 15:07:54 2024 +0800
[flink] Support custom initial flink catalog & env in ActionBase (#3152)
---
.../java/org/apache/paimon/flink/action/ActionBase.java | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 3a9e39b4e..dd32c52c6 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -55,8 +55,8 @@ public abstract class ActionBase implements Action {
catalogOptions = Options.fromMap(catalogConfig);
catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
- catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
- flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog,
catalogOptions);
+ catalog = initPaimonCatalog();
+ flinkCatalog = initFlinkCatalog();
// use the default env if user doesn't pass one
initFlinkEnv(StreamExecutionEnvironment.getExecutionEnvironment());
@@ -67,7 +67,15 @@ public abstract class ActionBase implements Action {
return this;
}
- private void initFlinkEnv(StreamExecutionEnvironment env) {
+ protected Catalog initPaimonCatalog() {
+ return FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+ }
+
+ protected FlinkCatalog initFlinkCatalog() {
+ return FlinkCatalogFactory.createCatalog(catalogName, catalog,
catalogOptions);
+ }
+
+ protected void initFlinkEnv(StreamExecutionEnvironment env) {
this.env = env;
// we enable object reuse, we copy the un-reusable object ourselves.
this.env.getConfig().enableObjectReuse();