This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.4
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.4 by this push:
     new 9a0295147 [Bug][Flink-Submit][dev-2.1.4]Fixed security issues with 
submitting external applications to manipulate the Jvm  FLINK (#3661)
9a0295147 is described below

commit 9a0295147501ce690975c4fb9628aeb2b11c8ced
Author: ZhilinLi <[email protected]>
AuthorDate: Wed Apr 10 19:41:29 2024 +0800

    [Bug][Flink-Submit][dev-2.1.4]Fixed security issues with submitting 
external applications to manipulate the Jvm  FLINK (#3661)
---
 .../streampark/flink/client/FlinkClient.scala       | 21 +++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
index 3c7c65dcb..867625695 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
@@ -22,9 +22,10 @@ import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.proxy.FlinkShimsProxy
 
+import java.security.Permission
+
 import scala.language.{implicitConversions, reflectiveCalls}
 import scala.reflect.ClassTag
-
 object FlinkClient extends Logger {
 
   private[this] val FLINK_CLIENT_HANDLER_CLASS_NAME =
@@ -46,7 +47,13 @@ object FlinkClient extends Logger {
     "org.apache.streampark.flink.client.bean.TriggerSavepointRequest" -> 
"triggerSavepoint"
 
   def submit(submitRequest: SubmitRequest): SubmitResponse = {
-    proxy[SubmitResponse](submitRequest, submitRequest.flinkVersion, 
SUBMIT_REQUEST)
+    val securityManager = System.getSecurityManager
+    try {
+      System.setSecurityManager(new ExitSecurityManager())
+      proxy[SubmitResponse](submitRequest, submitRequest.flinkVersion, 
SUBMIT_REQUEST)
+    } finally {
+      System.setSecurityManager(securityManager)
+    }
   }
 
   def cancel(stopRequest: CancelRequest): CancelResponse = {
@@ -87,3 +94,13 @@ object FlinkClient extends Logger {
   }
 
 }
+
+/** Used to mask JVM requests for external operations */
+class ExitSecurityManager extends SecurityManager {
+  override def checkExit(status: Int): Unit = {
+    throw new SecurityException(
+      s"System.exit($status) was called in your flink job, The job has been 
stopped, please check your program...")
+  }
+
+  override def checkPermission(perm: Permission): Unit = {}
+}

Reply via email to