This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
new 9a0295147 [Bug][Flink-Submit][dev-2.1.4]Fixed security issues with
submitting external applications to manipulate the Jvm FLINK (#3661)
9a0295147 is described below
commit 9a0295147501ce690975c4fb9628aeb2b11c8ced
Author: ZhilinLi <[email protected]>
AuthorDate: Wed Apr 10 19:41:29 2024 +0800
[Bug][Flink-Submit][dev-2.1.4]Fixed security issues with submitting
external applications to manipulate the Jvm FLINK (#3661)
---
.../streampark/flink/client/FlinkClient.scala | 21 +++++++++++++++++++--
1 file changed, 19 insertions(+), 2 deletions(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
index 3c7c65dcb..867625695 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
@@ -22,9 +22,10 @@ import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.proxy.FlinkShimsProxy
+import java.security.Permission
+
import scala.language.{implicitConversions, reflectiveCalls}
import scala.reflect.ClassTag
-
object FlinkClient extends Logger {
private[this] val FLINK_CLIENT_HANDLER_CLASS_NAME =
@@ -46,7 +47,13 @@ object FlinkClient extends Logger {
"org.apache.streampark.flink.client.bean.TriggerSavepointRequest" ->
"triggerSavepoint"
def submit(submitRequest: SubmitRequest): SubmitResponse = {
- proxy[SubmitResponse](submitRequest, submitRequest.flinkVersion,
SUBMIT_REQUEST)
+ val securityManager = System.getSecurityManager
+ try {
+ System.setSecurityManager(new ExitSecurityManager())
+ proxy[SubmitResponse](submitRequest, submitRequest.flinkVersion,
SUBMIT_REQUEST)
+ } finally {
+ System.setSecurityManager(securityManager)
+ }
}
def cancel(stopRequest: CancelRequest): CancelResponse = {
@@ -87,3 +94,13 @@ object FlinkClient extends Logger {
}
}
+
+/** Used to mask JVM requests for external operations */
+class ExitSecurityManager extends SecurityManager {
+ override def checkExit(status: Int): Unit = {
+ throw new SecurityException(
+ s"System.exit($status) was called in your flink job, The job has been
stopped, please check your program...")
+ }
+
+ override def checkPermission(perm: Permission): Unit = {}
+}