This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e589a422c [flink] Remove compatibility utils that are used for 
flink-1.14 (#3054)
e589a422c is described below

commit e589a422c6fe121ac171ae0d30a39a321de8d5b7
Author: yuzelin <[email protected]>
AuthorDate: Wed Mar 20 12:42:37 2024 +0800

    [flink] Remove compatibility utils that are used for flink-1.14 (#3054)
---
 .../flink/sink/cdc/FlinkCdcMultiTableSink.java     |  6 +--
 .../org/apache/paimon/flink/TableConfigUtils.java  | 52 -------------------
 .../org/apache/paimon/flink/action/ActionBase.java |  3 +-
 .../apache/paimon/flink/action/CompactAction.java  |  3 +-
 .../paimon/flink/action/CompactDatabaseAction.java |  5 +-
 .../paimon/flink/action/TableActionBase.java       | 34 ++++++++++--
 .../paimon/flink/procedure/ProcedureBase.java      |  5 +-
 .../apache/paimon/flink/service/QueryService.java  |  3 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  6 +--
 .../flink/sink/MultiTablesCompactorSink.java       |  6 +--
 .../utils/StreamExecutionEnvironmentUtils.java     | 30 -----------
 .../paimon/flink/utils/TableEnvironmentUtils.java  | 60 ----------------------
 12 files changed, 43 insertions(+), 170 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index 99d933d89..c0156e841 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -32,7 +32,6 @@ import org.apache.paimon.flink.sink.StoreMultiCommitter;
 import org.apache.paimon.flink.sink.StoreSinkWrite;
 import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
 import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.manifest.WrappedManifestCommittable;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
@@ -133,10 +132,7 @@ public class FlinkCdcMultiTableSink implements 
Serializable {
                                         createCommittableStateManager()))
                         .setParallelism(input.getParallelism());
         configureGlobalCommitter(
-                committed,
-                commitCpuCores,
-                commitHeapMemory,
-                StreamExecutionEnvironmentUtils.getConfiguration(env));
+                committed, commitCpuCores, commitHeapMemory, 
env.getConfiguration());
         return committed.addSink(new 
DiscardingSink<>()).name("end").setParallelism(1);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableConfigUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableConfigUtils.java
deleted file mode 100644
index ffeb9b92e..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableConfigUtils.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.TableConfig;
-
-/** Utils for {@link TableConfig}. */
-public class TableConfigUtils {
-
-    public static Configuration extractConfiguration(ReadableConfig 
readableConfig) {
-        Configuration to = new Configuration();
-        copyConfiguration(readableConfig, to);
-        return to;
-    }
-
-    private static void copyConfiguration(ReadableConfig from, Configuration 
to) {
-        if (from instanceof Configuration) {
-            to.addAll((Configuration) from);
-            return;
-        }
-
-        if (!(from instanceof TableConfig)) {
-            throw new RuntimeException("Unknown readableConfig type: " + 
from.getClass());
-        }
-
-        TableConfig tableConfig = (TableConfig) from;
-
-        // copy root configuration first
-        copyConfiguration(tableConfig.getRootConfiguration(), to);
-
-        // copy table configuration
-        to.addAll(tableConfig.getConfiguration());
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 7f4c03a38..3a9e39b4e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -23,7 +23,6 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.flink.FlinkCatalog;
 import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.flink.LogicalTypeConversion;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataType;
@@ -80,7 +79,7 @@ public abstract class ActionBase implements Action {
     }
 
     protected void execute(String defaultName) throws Exception {
-        ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
+        ReadableConfig conf = env.getConfiguration();
         String name = 
conf.getOptional(PipelineOptions.NAME).orElse(defaultName);
         env.execute(name);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 313a7a5d4..e24f8d9a9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
 import org.apache.paimon.flink.sink.CompactorSinkBuilder;
 import org.apache.paimon.flink.source.CompactorSourceBuilder;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.table.FileStoreTable;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -75,7 +74,7 @@ public class CompactAction extends TableActionBase {
 
     @Override
     public void build() {
-        ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
+        ReadableConfig conf = env.getConfiguration();
         boolean isStreaming =
                 conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
         FileStoreTable fileStoreTable = (FileStoreTable) table;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
index a97b3048e..408d5768c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
@@ -28,7 +28,6 @@ import org.apache.paimon.flink.sink.CompactorSinkBuilder;
 import org.apache.paimon.flink.sink.MultiTablesCompactorSink;
 import org.apache.paimon.flink.source.CompactorSourceBuilder;
 import org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -161,7 +160,7 @@ public class CompactDatabaseAction extends ActionBase {
                 !tableMap.isEmpty(),
                 "no tables to be compacted. possible cause is that there are 
no tables detected after pattern matching");
 
-        ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
+        ReadableConfig conf = env.getConfiguration();
         boolean isStreaming =
                 conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
         for (Map.Entry<String, FileStoreTable> entry : tableMap.entrySet()) {
@@ -186,7 +185,7 @@ public class CompactDatabaseAction extends ActionBase {
 
     private void buildForCombinedMode() {
 
-        ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
+        ReadableConfig conf = env.getConfiguration();
         boolean isStreaming =
                 conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
         // TODO: Currently, multi-tables compaction don't support tables which 
bucketmode is UNWARE.
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index 06d234469..a97335cd2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.sink.FlinkSinkBuilder;
-import org.apache.paimon.flink.utils.TableEnvironmentUtils;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.Preconditions;
@@ -30,8 +29,11 @@ import org.apache.paimon.utils.Preconditions;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.table.data.RowData;
 
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -68,8 +70,34 @@ public abstract class TableActionBase extends ActionBase {
 
         List<String> sinkIdentifierNames = 
Collections.singletonList(identifier.getFullName());
 
-        return TableEnvironmentUtils.executeInternal(
-                batchTEnv, transformations, sinkIdentifierNames);
+        return executeInternal(transformations, sinkIdentifierNames);
+    }
+
+    /**
+     * Invoke {@code 
TableEnvironmentImpl#executeInternal(List<Transformation<?>>, List<String>)}
+     * from a {@link StreamTableEnvironment} instance through reflecting.
+     */
+    private TableResult executeInternal(
+            List<Transformation<?>> transformations, List<String> 
sinkIdentifierNames) {
+        Class<?> clazz = batchTEnv.getClass().getSuperclass().getSuperclass();
+        try {
+            Method executeInternal =
+                    clazz.getDeclaredMethod("executeInternal", List.class, 
List.class);
+            executeInternal.setAccessible(true);
+
+            return (TableResult)
+                    executeInternal.invoke(batchTEnv, transformations, 
sinkIdentifierNames);
+        } catch (NoSuchMethodException e) {
+            throw new RuntimeException(
+                    "Failed to get 'TableEnvironmentImpl#executeInternal(List, 
List)' method "
+                            + "from given StreamTableEnvironment instance by 
Java reflection. This is unexpected.",
+                    e);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+            throw new RuntimeException(
+                    "Failed to invoke 
'TableEnvironmentImpl#executeInternal(List, List)' method "
+                            + "from given StreamTableEnvironment instance by 
Java reflection. This is unexpected.",
+                    e);
+        }
     }
 
     /**
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
index fd7f74148..7d5542109 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
@@ -22,7 +22,6 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.factories.Factory;
 import org.apache.paimon.flink.action.ActionBase;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.StringUtils;
 
@@ -69,13 +68,13 @@ public abstract class ProcedureBase implements Procedure, 
Factory {
 
     protected String[] execute(ProcedureContext procedureContext, JobClient 
jobClient) {
         StreamExecutionEnvironment env = 
procedureContext.getExecutionEnvironment();
-        ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
+        ReadableConfig conf = env.getConfiguration();
         return execute(jobClient, conf.get(TABLE_DML_SYNC));
     }
 
     protected String[] execute(StreamExecutionEnvironment env, String 
defaultJobName)
             throws Exception {
-        ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
+        ReadableConfig conf = env.getConfiguration();
         String name = 
conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
         return execute(env.executeAsync(name), conf.get(TABLE_DML_SYNC));
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
index 5b2c13c84..8a4814d0a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.service;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.utils.InternalTypeInfo;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -38,7 +37,7 @@ import static 
org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
 public class QueryService {
 
     public static void build(StreamExecutionEnvironment env, Table table, int 
parallelism) {
-        ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
+        ReadableConfig conf = env.getConfiguration();
         Preconditions.checkArgument(
                 conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING,
                 "Query Service only supports streaming mode.");
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 545bd7f07..582fcfc35 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.TagCreationMode;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
@@ -190,8 +189,7 @@ public abstract class FlinkSink<T> implements Serializable {
             DataStream<T> input, String commitUser, @Nullable Integer 
parallelism) {
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
         boolean isStreaming =
-                StreamExecutionEnvironmentUtils.getConfiguration(env)
-                                .get(ExecutionOptions.RUNTIME_MODE)
+                env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
                         == RuntimeExecutionMode.STREAMING;
 
         boolean writeOnly = table.coreOptions().writeOnly();
@@ -222,7 +220,7 @@ public abstract class FlinkSink<T> implements Serializable {
 
     protected DataStreamSink<?> doCommit(DataStream<Committable> written, 
String commitUser) {
         StreamExecutionEnvironment env = written.getExecutionEnvironment();
-        ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
+        ReadableConfig conf = env.getConfiguration();
         CheckpointConfig checkpointConfig = env.getCheckpointConfig();
         boolean isStreaming =
                 conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
index d9ded153d..659296633 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.flink.VersionedSerializerWrapper;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
 import org.apache.paimon.manifest.WrappedManifestCommittable;
 import org.apache.paimon.options.Options;
 
@@ -87,8 +86,7 @@ public class MultiTablesCompactorSink implements Serializable 
{
             DataStream<RowData> input, String commitUser, Integer parallelism) 
{
         StreamExecutionEnvironment env = input.getExecutionEnvironment();
         boolean isStreaming =
-                StreamExecutionEnvironmentUtils.getConfiguration(env)
-                                .get(ExecutionOptions.RUNTIME_MODE)
+                env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
                         == RuntimeExecutionMode.STREAMING;
 
         SingleOutputStreamOperator<MultiTableCommittable> written =
@@ -112,7 +110,7 @@ public class MultiTablesCompactorSink implements 
Serializable {
     protected DataStreamSink<?> doCommit(
             DataStream<MultiTableCommittable> written, String commitUser) {
         StreamExecutionEnvironment env = written.getExecutionEnvironment();
-        ReadableConfig conf = 
StreamExecutionEnvironmentUtils.getConfiguration(env);
+        ReadableConfig conf = env.getConfiguration();
         CheckpointConfig checkpointConfig = env.getCheckpointConfig();
         boolean isStreaming =
                 conf.get(ExecutionOptions.RUNTIME_MODE) == 
RuntimeExecutionMode.STREAMING;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
deleted file mode 100644
index 9f3b28bbe..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.utils;
-
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/** Utility methods for {@link StreamExecutionEnvironment}. */
-public class StreamExecutionEnvironmentUtils {
-
-    public static ReadableConfig getConfiguration(StreamExecutionEnvironment 
env) {
-        return env.getConfiguration();
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
deleted file mode 100644
index 6a5820ce8..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.utils;
-
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.List;
-
-/** Utility methods for {@link TableEnvironment} and its subclasses. */
-public class TableEnvironmentUtils {
-
-    /**
-     * Invoke {@code 
TableEnvironmentImpl#executeInternal(List<Transformation<?>>, List<String>)}
-     * from a {@link StreamTableEnvironment} instance through reflecting.
-     */
-    public static TableResult executeInternal(
-            StreamTableEnvironment tEnv,
-            List<Transformation<?>> transformations,
-            List<String> sinkIdentifierNames) {
-        Class<?> clazz = tEnv.getClass().getSuperclass().getSuperclass();
-        try {
-            Method executeInternal =
-                    clazz.getDeclaredMethod("executeInternal", List.class, 
List.class);
-            executeInternal.setAccessible(true);
-
-            return (TableResult) executeInternal.invoke(tEnv, transformations, 
sinkIdentifierNames);
-        } catch (NoSuchMethodException e) {
-            throw new RuntimeException(
-                    "Failed to get 'TableEnvironmentImpl#executeInternal(List, 
List)' method "
-                            + "from given StreamTableEnvironment instance by 
Java reflection. This is unexpected.",
-                    e);
-        } catch (IllegalAccessException | InvocationTargetException e) {
-            throw new RuntimeException(
-                    "Failed to invoke 
'TableEnvironmentImpl#executeInternal(List, List)' method "
-                            + "from given StreamTableEnvironment instance by 
Java reflection. This is unexpected.",
-                    e);
-        }
-    }
-}

Reply via email to