This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 8fbb53776 [Bug][Flink-Submit] Fixed security issues with submitting
external applications to manipulate the Jvm (#3659)
8fbb53776 is described below
commit 8fbb537763f4af214b86cb5b72de46bda9c67168
Author: ZhilinLi <[email protected]>
AuthorDate: Wed Apr 10 18:40:39 2024 +0800
[Bug][Flink-Submit] Fixed security issues with submitting external
applications to manipulate the Jvm (#3659)
* [Bug][Flink-Submit] Fixed security issues with submitting external
applications to manipulate the Jvm
---------
Co-authored-by: benjobs <[email protected]>
---
.../apache/streampark/flink/client/FlinkClient.scala | 20 +++++++++++++++++++-
1 file changed, 19 insertions(+), 1 deletion(-)
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
index 10d2fd28c..7bc69db05 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala
@@ -22,6 +22,8 @@ import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.client.bean._
import org.apache.streampark.flink.proxy.FlinkShimsProxy
+import java.security.Permission
+
import scala.language.{implicitConversions, reflectiveCalls}
import scala.reflect.ClassTag
@@ -46,7 +48,13 @@ object FlinkClient extends Logger {
"org.apache.streampark.flink.client.bean.TriggerSavepointRequest" ->
"triggerSavepoint"
def submit(submitRequest: SubmitRequest): SubmitResponse = {
- proxy[SubmitResponse](submitRequest, submitRequest.flinkVersion,
SUBMIT_REQUEST)
+ val securityManager = System.getSecurityManager
+ try {
+ System.setSecurityManager(new ExitSecurityManager())
+ proxy[SubmitResponse](submitRequest, submitRequest.flinkVersion,
SUBMIT_REQUEST)
+ } finally {
+ System.setSecurityManager(securityManager)
+ }
}
def cancel(stopRequest: CancelRequest): CancelResponse = {
@@ -87,3 +95,13 @@ object FlinkClient extends Logger {
}
}
+
+/** Used to mask JVM requests for external operations */
+class ExitSecurityManager extends SecurityManager {
+ override def checkExit(status: Int): Unit = {
+ throw new SecurityException(
+ s"System.exit($status) was called in your flink job, The job has been
stopped, please check your program...")
+ }
+
+ override def checkPermission(perm: Permission): Unit = {}
+}