This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch env in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 25475f344d9df485bd3c63355ac86b16946e4519 Author: benjobs <[email protected]> AuthorDate: Sun Sep 10 21:36:05 2023 +0800 [Improve] EnvUtils minor improvement --- .../apache/streampark/common/util/EnvUtils.java | 41 ---------------------- .../streampark/common/util/HadoopConfigUtils.scala | 4 +-- .../common/util/SystemPropertyUtils.scala | 16 +++++++++ .../flink/client/trait/FlinkClientTrait.scala | 4 +-- 4 files changed, 20 insertions(+), 45 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java b/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java deleted file mode 100644 index a8fd54be3..000000000 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/EnvUtils.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.streampark.common.util; - -import java.lang.reflect.Field; -import java.lang.reflect.Method; -import java.util.Map; - - -public class EnvUtils { - public static void setEnv(String name, String value) throws Exception { - getModifiableEnvironment().put(name, value); - } - - @SuppressWarnings("unchecked") - private static Map<String, String> getModifiableEnvironment() throws Exception { - Class<?> pe = Class.forName("java.lang.ProcessEnvironment"); - Method getenv = pe.getDeclaredMethod("getenv"); - getenv.setAccessible(true); - Object unmodifiableEnvironment = getenv.invoke(null); - Class<?> map = Class.forName("java.util.Collections$UnmodifiableMap"); - Field m = map.getDeclaredField("m"); - m.setAccessible(true); - return (Map<String, String>) m.get(unmodifiableEnvironment); - } -} - diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala index 7fa179177..396a8467e 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopConfigUtils.scala @@ -32,10 +32,10 @@ import scala.util.{Failure, Success, Try} /** Hadoop client configuration tools mainly for flink use. */ object HadoopConfigUtils { - val HADOOP_CLIENT_CONF_FILES: Array[String] = + private[this] val HADOOP_CLIENT_CONF_FILES: Array[String] = Array("core-site.xml", "hdfs-site.xml", "yarn-site.xml") - val HIVE_CLIENT_CONF_FILES: Array[String] = + private[this] val HIVE_CLIENT_CONF_FILES: Array[String] = Array("core-site.xml", "hdfs-site.xml", "hive-site.xml") /** Get Hadoop configuration directory path from system. */ diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala index 11dad97ff..864f3aa44 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/SystemPropertyUtils.scala @@ -18,6 +18,7 @@ package org.apache.streampark.common.util import java.io.File import java.security.{AccessController, PrivilegedAction} +import java.util import scala.util.{Failure, Success, Try} @@ -96,6 +97,21 @@ object SystemPropertyUtils extends Logger { def set(key: String, value: String): String = System.getProperties.setProperty(key, value).asInstanceOf[String] + @throws[Exception] + def setEnv(name: String, value: String): Unit = { + val envClass = Class.forName("java.lang.ProcessEnvironment") + val getEnv = envClass.getDeclaredMethod("getenv") + getEnv.setAccessible(true) + val unmodifiableEnvironment = getEnv.invoke(null) + val clazz = Class.forName("java.util.Collections$UnmodifiableMap") + val field = clazz.getDeclaredField("m") + field.setAccessible(true) + field + .get(unmodifiableEnvironment) + .asInstanceOf[util.Map[String, String]] + .put(name, value) + } + def getOrElseUpdate(key: String, default: String): String = { get(key) match { case null => diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index e6b16e377..de7907b89 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -21,7 +21,7 @@ import org.apache.streampark.common.conf.{ConfigConst, Workspace} import org.apache.streampark.common.conf.ConfigConst._ import org.apache.streampark.common.enums.{ApplicationType, DevelopmentMode, ExecutionMode, RestoreMode} import org.apache.streampark.common.fs.FsOperator -import org.apache.streampark.common.util.{DeflaterUtils, EnvUtils, Logger} +import org.apache.streampark.common.util.{DeflaterUtils, Logger, SystemPropertyUtils} import org.apache.streampark.flink.client.bean._ import org.apache.streampark.flink.core.FlinkClusterClient import org.apache.streampark.flink.core.conf.FlinkRunOption @@ -97,7 +97,7 @@ trait FlinkClientTrait extends Logger { if (StringUtils.isBlank(flinkOptPath)) { logWarn(s"Get environment variable ${ConfigConstants.ENV_FLINK_OPT_DIR} fail") val flinkHome = submitRequest.flinkVersion.flinkHome - EnvUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, s"$flinkHome/opt"); + SystemPropertyUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, s"$flinkHome/opt") logInfo( s"Set temporary environment variables ${ConfigConstants.ENV_FLINK_OPT_DIR} = $flinkHome/opt") }
