This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8fbb53776 [Bug][Flink-Submit] Fixed security issues with submitting 
external applications to manipulate the Jvm (#3659)
8fbb53776 is described below

commit 8fbb537763f4af214b86cb5b72de46bda9c67168
Author: ZhilinLi <[email protected]>
AuthorDate: Wed Apr 10 18:40:39 2024 +0800

    [Bug][Flink-Submit] Fixed security issues with submitting external 
applications to manipulate the Jvm (#3659)
    
    * [Bug][Flink-Submit] Fixed security issues with submitting external 
applications to manipulate the Jvm
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../apache/streampark/flink/client/FlinkClient.scala | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
index 10d2fd28c..7bc69db05 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
@@ -22,6 +22,8 @@ import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.client.bean._
 import org.apache.streampark.flink.proxy.FlinkShimsProxy
 
+import java.security.Permission
+
 import scala.language.{implicitConversions, reflectiveCalls}
 import scala.reflect.ClassTag
 
@@ -46,7 +48,13 @@ object FlinkClient extends Logger {
     "org.apache.streampark.flink.client.bean.TriggerSavepointRequest" -> 
"triggerSavepoint"
 
   def submit(submitRequest: SubmitRequest): SubmitResponse = {
-    proxy[SubmitResponse](submitRequest, submitRequest.flinkVersion, 
SUBMIT_REQUEST)
+    val securityManager = System.getSecurityManager
+    try {
+      System.setSecurityManager(new ExitSecurityManager())
+      proxy[SubmitResponse](submitRequest, submitRequest.flinkVersion, 
SUBMIT_REQUEST)
+    } finally {
+      System.setSecurityManager(securityManager)
+    }
   }
 
   def cancel(stopRequest: CancelRequest): CancelResponse = {
@@ -87,3 +95,13 @@ object FlinkClient extends Logger {
   }
 
 }
+
+/** Used to mask JVM requests for external operations */
+class ExitSecurityManager extends SecurityManager {
+  override def checkExit(status: Int): Unit = {
+    throw new SecurityException(
+      s"System.exit($status) was called in your flink job, The job has been 
stopped, please check your program...")
+  }
+
+  override def checkPermission(perm: Permission): Unit = {}
+}

Reply via email to