This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 58beebec9 [flink] Support custom initial flink catalog & env in 
ActionBase (#3152)
58beebec9 is described below

commit 58beebec9646723ce2615fda11493591d1128ecc
Author: melin <[email protected]>
AuthorDate: Sun Apr 7 15:07:54 2024 +0800

    [flink] Support custom initial flink catalog & env in ActionBase (#3152)
---
 .../java/org/apache/paimon/flink/action/ActionBase.java    | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 3a9e39b4e..dd32c52c6 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -55,8 +55,8 @@ public abstract class ActionBase implements Action {
         catalogOptions = Options.fromMap(catalogConfig);
         catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
 
-        catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
-        flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog, 
catalogOptions);
+        catalog = initPaimonCatalog();
+        flinkCatalog = initFlinkCatalog();
 
         // use the default env if user doesn't pass one
         initFlinkEnv(StreamExecutionEnvironment.getExecutionEnvironment());
@@ -67,7 +67,15 @@ public abstract class ActionBase implements Action {
         return this;
     }
 
-    private void initFlinkEnv(StreamExecutionEnvironment env) {
+    protected Catalog initPaimonCatalog() {
+        return FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
+    }
+
+    protected FlinkCatalog initFlinkCatalog() {
+        return FlinkCatalogFactory.createCatalog(catalogName, catalog, 
catalogOptions);
+    }
+
+    protected void initFlinkEnv(StreamExecutionEnvironment env) {
         this.env = env;
         // we enable object reuse, we copy the un-reusable object ourselves.
         this.env.getConfig().enableObjectReuse();

Reply via email to