This is an automated email from the ASF dual-hosted git repository.
linying pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 48838c5e8 [Improve] EnvUtils minor improvement (#3040)
48838c5e8 is described below
commit 48838c5e8891e0897369f9cc129d09d14408fb6d
Author: benjobs <[email protected]>
AuthorDate: Mon Sep 11 04:33:59 2023 -0500
[Improve] EnvUtils minor improvement (#3040)
---
.../apache/streampark/common/util/EnvUtils.java | 41 ----------------------
.../streampark/common/util/HadoopConfigUtils.scala | 4 +--
.../common/util/SystemPropertyUtils.scala | 16 +++++++++
.../flink/client/trait/FlinkClientTrait.scala | 4 +--
4 files changed, 20 insertions(+), 45 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java
b/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java
deleted file mode 100644
index a8fd54be3..000000000
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.common.util;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Method;
-import java.util.Map;
-
-
-public class EnvUtils {
- public static void setEnv(String name, String value) throws Exception {
- getModifiableEnvironment().put(name, value);
- }
-
- @SuppressWarnings("unchecked")
- private static Map<String, String> getModifiableEnvironment() throws
Exception {
- Class<?> pe = Class.forName("java.lang.ProcessEnvironment");
- Method getenv = pe.getDeclaredMethod("getenv");
- getenv.setAccessible(true);
- Object unmodifiableEnvironment = getenv.invoke(null);
- Class<?> map = Class.forName("java.util.Collections$UnmodifiableMap");
- Field m = map.getDeclaredField("m");
- m.setAccessible(true);
- return (Map<String, String>) m.get(unmodifiableEnvironment);
- }
-}
-
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
index 7fa179177..396a8467e 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala
@@ -32,10 +32,10 @@ import scala.util.{Failure, Success, Try}
/** Hadoop client configuration tools mainly for flink use. */
object HadoopConfigUtils {
- val HADOOP_CLIENT_CONF_FILES: Array[String] =
+ private[this] val HADOOP_CLIENT_CONF_FILES: Array[String] =
Array("core-site.xml", "hdfs-site.xml", "yarn-site.xml")
- val HIVE_CLIENT_CONF_FILES: Array[String] =
+ private[this] val HIVE_CLIENT_CONF_FILES: Array[String] =
Array("core-site.xml", "hdfs-site.xml", "hive-site.xml")
/** Get Hadoop configuration directory path from system. */
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
index 11dad97ff..864f3aa44 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala
@@ -18,6 +18,7 @@ package org.apache.streampark.common.util
import java.io.File
import java.security.{AccessController, PrivilegedAction}
+import java.util
import scala.util.{Failure, Success, Try}
@@ -96,6 +97,21 @@ object SystemPropertyUtils extends Logger {
def set(key: String, value: String): String =
System.getProperties.setProperty(key, value).asInstanceOf[String]
+ @throws[Exception]
+ def setEnv(name: String, value: String): Unit = {
+ val envClass = Class.forName("java.lang.ProcessEnvironment")
+ val getEnv = envClass.getDeclaredMethod("getenv")
+ getEnv.setAccessible(true)
+ val unmodifiableEnvironment = getEnv.invoke(null)
+ val clazz = Class.forName("java.util.Collections$UnmodifiableMap")
+ val field = clazz.getDeclaredField("m")
+ field.setAccessible(true)
+ field
+ .get(unmodifiableEnvironment)
+ .asInstanceOf[util.Map[String, String]]
+ .put(name, value)
+ }
+
def getOrElseUpdate(key: String, default: String): String = {
get(key) match {
case null =>
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index e6b16e377..de7907b89 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -21,7 +21,7 @@ import org.apache.streampark.common.conf.{ConfigConst,
Workspace}
import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode,
ExecutionMode, RestoreMode}
import org.apache.streampark.common.fs.FsOperator
-import org.apache.streampark.common.util.{DeflaterUtils, EnvUtils, Logger}
+import org.apache.streampark.common.util.{DeflaterUtils, Logger,
SystemPropertyUtils}
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.core.FlinkClusterClient
import org.apache.streampark.flink.core.conf.FlinkRunOption
@@ -97,7 +97,7 @@ trait FlinkClientTrait extends Logger {
if (StringUtils.isBlank(flinkOptPath)) {
logWarn(s"Get environment variable
${ConfigConstants.ENV_FLINK_OPT_DIR} fail")
val flinkHome = submitRequest.flinkVersion.flinkHome
- EnvUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR,
s"$flinkHome/opt");
+ SystemPropertyUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR,
s"$flinkHome/opt")
logInfo(
s"Set temporary environment variables
${ConfigConstants.ENV_FLINK_OPT_DIR} = $flinkHome/opt")
}