This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e589a422c [flink] Remove compatibility utils that are used for
flink-1.14 (#3054)
e589a422c is described below
commit e589a422c6fe121ac171ae0d30a39a321de8d5b7
Author: yuzelin <[email protected]>
AuthorDate: Wed Mar 20 12:42:37 2024 +0800
[flink] Remove compatibility utils that are used for flink-1.14 (#3054)
---
.../flink/sink/cdc/FlinkCdcMultiTableSink.java | 6 +--
.../org/apache/paimon/flink/TableConfigUtils.java | 52 -------------------
.../org/apache/paimon/flink/action/ActionBase.java | 3 +-
.../apache/paimon/flink/action/CompactAction.java | 3 +-
.../paimon/flink/action/CompactDatabaseAction.java | 5 +-
.../paimon/flink/action/TableActionBase.java | 34 ++++++++++--
.../paimon/flink/procedure/ProcedureBase.java | 5 +-
.../apache/paimon/flink/service/QueryService.java | 3 +-
.../org/apache/paimon/flink/sink/FlinkSink.java | 6 +--
.../flink/sink/MultiTablesCompactorSink.java | 6 +--
.../utils/StreamExecutionEnvironmentUtils.java | 30 -----------
.../paimon/flink/utils/TableEnvironmentUtils.java | 60 ----------------------
12 files changed, 43 insertions(+), 170 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index 99d933d89..c0156e841 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -32,7 +32,6 @@ import org.apache.paimon.flink.sink.StoreMultiCommitter;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
@@ -133,10 +132,7 @@ public class FlinkCdcMultiTableSink implements
Serializable {
createCommittableStateManager()))
.setParallelism(input.getParallelism());
configureGlobalCommitter(
- committed,
- commitCpuCores,
- commitHeapMemory,
- StreamExecutionEnvironmentUtils.getConfiguration(env));
+ committed, commitCpuCores, commitHeapMemory,
env.getConfiguration());
return committed.addSink(new
DiscardingSink<>()).name("end").setParallelism(1);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableConfigUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableConfigUtils.java
deleted file mode 100644
index ffeb9b92e..000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableConfigUtils.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.table.api.TableConfig;
-
-/** Utils for {@link TableConfig}. */
-public class TableConfigUtils {
-
- public static Configuration extractConfiguration(ReadableConfig
readableConfig) {
- Configuration to = new Configuration();
- copyConfiguration(readableConfig, to);
- return to;
- }
-
- private static void copyConfiguration(ReadableConfig from, Configuration
to) {
- if (from instanceof Configuration) {
- to.addAll((Configuration) from);
- return;
- }
-
- if (!(from instanceof TableConfig)) {
- throw new RuntimeException("Unknown readableConfig type: " +
from.getClass());
- }
-
- TableConfig tableConfig = (TableConfig) from;
-
- // copy root configuration first
- copyConfiguration(tableConfig.getRootConfiguration(), to);
-
- // copy table configuration
- to.addAll(tableConfig.getConfiguration());
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 7f4c03a38..3a9e39b4e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -23,7 +23,6 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.LogicalTypeConversion;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;
@@ -80,7 +79,7 @@ public abstract class ActionBase implements Action {
}
protected void execute(String defaultName) throws Exception {
- ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
+ ReadableConfig conf = env.getConfiguration();
String name =
conf.getOptional(PipelineOptions.NAME).orElse(defaultName);
env.execute(name);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 313a7a5d4..e24f8d9a9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -75,7 +74,7 @@ public class CompactAction extends TableActionBase {
@Override
public void build() {
- ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
+ ReadableConfig conf = env.getConfiguration();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING;
FileStoreTable fileStoreTable = (FileStoreTable) table;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
index a97b3048e..408d5768c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
@@ -28,7 +28,6 @@ import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.sink.MultiTablesCompactorSink;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -161,7 +160,7 @@ public class CompactDatabaseAction extends ActionBase {
!tableMap.isEmpty(),
"no tables to be compacted. possible cause is that there are
no tables detected after pattern matching");
- ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
+ ReadableConfig conf = env.getConfiguration();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING;
for (Map.Entry<String, FileStoreTable> entry : tableMap.entrySet()) {
@@ -186,7 +185,7 @@ public class CompactDatabaseAction extends ActionBase {
private void buildForCombinedMode() {
- ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
+ ReadableConfig conf = env.getConfiguration();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING;
// TODO: Currently, multi-tables compaction don't support tables which
bucketmode is UNWARE.
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index 06d234469..a97335cd2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -22,7 +22,6 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
-import org.apache.paimon.flink.utils.TableEnvironmentUtils;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;
@@ -30,8 +29,11 @@ import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -68,8 +70,34 @@ public abstract class TableActionBase extends ActionBase {
List<String> sinkIdentifierNames =
Collections.singletonList(identifier.getFullName());
- return TableEnvironmentUtils.executeInternal(
- batchTEnv, transformations, sinkIdentifierNames);
+ return executeInternal(transformations, sinkIdentifierNames);
+ }
+
+ /**
+ * Invoke {@code
TableEnvironmentImpl#executeInternal(List<Transformation<?>>, List<String>)}
+ * from a {@link StreamTableEnvironment} instance through reflecting.
+ */
+ private TableResult executeInternal(
+ List<Transformation<?>> transformations, List<String>
sinkIdentifierNames) {
+ Class<?> clazz = batchTEnv.getClass().getSuperclass().getSuperclass();
+ try {
+ Method executeInternal =
+ clazz.getDeclaredMethod("executeInternal", List.class,
List.class);
+ executeInternal.setAccessible(true);
+
+ return (TableResult)
+ executeInternal.invoke(batchTEnv, transformations,
sinkIdentifierNames);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException(
+ "Failed to get 'TableEnvironmentImpl#executeInternal(List,
List)' method "
+ + "from given StreamTableEnvironment instance by
Java reflection. This is unexpected.",
+ e);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(
+ "Failed to invoke
'TableEnvironmentImpl#executeInternal(List, List)' method "
+ + "from given StreamTableEnvironment instance by
Java reflection. This is unexpected.",
+ e);
+ }
}
/**
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
index fd7f74148..7d5542109 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
@@ -22,7 +22,6 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.factories.Factory;
import org.apache.paimon.flink.action.ActionBase;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.StringUtils;
@@ -69,13 +68,13 @@ public abstract class ProcedureBase implements Procedure,
Factory {
protected String[] execute(ProcedureContext procedureContext, JobClient
jobClient) {
StreamExecutionEnvironment env =
procedureContext.getExecutionEnvironment();
- ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
+ ReadableConfig conf = env.getConfiguration();
return execute(jobClient, conf.get(TABLE_DML_SYNC));
}
protected String[] execute(StreamExecutionEnvironment env, String
defaultJobName)
throws Exception {
- ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
+ ReadableConfig conf = env.getConfiguration();
String name =
conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
return execute(env.executeAsync(name), conf.get(TABLE_DML_SYNC));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
index 5b2c13c84..8a4814d0a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.service;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.utils.InternalTypeInfo;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -38,7 +37,7 @@ import static
org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
public class QueryService {
public static void build(StreamExecutionEnvironment env, Table table, int
parallelism) {
- ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
+ ReadableConfig conf = env.getConfiguration();
Preconditions.checkArgument(
conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING,
"Query Service only supports streaming mode.");
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 545bd7f07..582fcfc35 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.TagCreationMode;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
@@ -190,8 +189,7 @@ public abstract class FlinkSink<T> implements Serializable {
DataStream<T> input, String commitUser, @Nullable Integer
parallelism) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
boolean isStreaming =
- StreamExecutionEnvironmentUtils.getConfiguration(env)
- .get(ExecutionOptions.RUNTIME_MODE)
+ env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
boolean writeOnly = table.coreOptions().writeOnly();
@@ -222,7 +220,7 @@ public abstract class FlinkSink<T> implements Serializable {
protected DataStreamSink<?> doCommit(DataStream<Committable> written,
String commitUser) {
StreamExecutionEnvironment env = written.getExecutionEnvironment();
- ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
+ ReadableConfig conf = env.getConfiguration();
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
index d9ded153d..659296633 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.VersionedSerializerWrapper;
-import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.Options;
@@ -87,8 +86,7 @@ public class MultiTablesCompactorSink implements Serializable
{
DataStream<RowData> input, String commitUser, Integer parallelism)
{
StreamExecutionEnvironment env = input.getExecutionEnvironment();
boolean isStreaming =
- StreamExecutionEnvironmentUtils.getConfiguration(env)
- .get(ExecutionOptions.RUNTIME_MODE)
+ env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;
SingleOutputStreamOperator<MultiTableCommittable> written =
@@ -112,7 +110,7 @@ public class MultiTablesCompactorSink implements
Serializable {
protected DataStreamSink<?> doCommit(
DataStream<MultiTableCommittable> written, String commitUser) {
StreamExecutionEnvironment env = written.getExecutionEnvironment();
- ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
+ ReadableConfig conf = env.getConfiguration();
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
deleted file mode 100644
index 9f3b28bbe..000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.utils;
-
-import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
-/** Utility methods for {@link StreamExecutionEnvironment}. */
-public class StreamExecutionEnvironmentUtils {
-
- public static ReadableConfig getConfiguration(StreamExecutionEnvironment
env) {
- return env.getConfiguration();
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
deleted file mode 100644
index 6a5820ce8..000000000
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.utils;
-
-import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.List;
-
-/** Utility methods for {@link TableEnvironment} and its subclasses. */
-public class TableEnvironmentUtils {
-
- /**
- * Invoke {@code
TableEnvironmentImpl#executeInternal(List<Transformation<?>>, List<String>)}
- * from a {@link StreamTableEnvironment} instance through reflecting.
- */
- public static TableResult executeInternal(
- StreamTableEnvironment tEnv,
- List<Transformation<?>> transformations,
- List<String> sinkIdentifierNames) {
- Class<?> clazz = tEnv.getClass().getSuperclass().getSuperclass();
- try {
- Method executeInternal =
- clazz.getDeclaredMethod("executeInternal", List.class,
List.class);
- executeInternal.setAccessible(true);
-
- return (TableResult) executeInternal.invoke(tEnv, transformations,
sinkIdentifierNames);
- } catch (NoSuchMethodException e) {
- throw new RuntimeException(
- "Failed to get 'TableEnvironmentImpl#executeInternal(List,
List)' method "
- + "from given StreamTableEnvironment instance by
Java reflection. This is unexpected.",
- e);
- } catch (IllegalAccessException | InvocationTargetException e) {
- throw new RuntimeException(
- "Failed to invoke
'TableEnvironmentImpl#executeInternal(List, List)' method "
- + "from given StreamTableEnvironment instance by
Java reflection. This is unexpected.",
- e);
- }
- }
-}