This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new f30b699ce [Improve] k8s application mode trigger savepoint bug fixed.
(#3995)
f30b699ce is described below
commit f30b699ce7c64dcbdd2a5ddbc43af1c6d6c5db8b
Author: benjobs <[email protected]>
AuthorDate: Mon Aug 26 23:59:46 2024 +0800
[Improve] k8s application mode trigger savepoint bug fixed. (#3995)
---
logback-spring.xml | 146 ---------------------
.../console/core/controller/ProxyController.java | 6 +
.../console/core/entity/Application.java | 1 +
.../core/service/impl/SavepointServiceImpl.java | 24 ++--
.../src/api/flink/app/app.ts | 2 -
.../flink/client/impl/RemoteClient.scala | 8 +-
6 files changed, 25 insertions(+), 162 deletions(-)
diff --git a/logback-spring.xml b/logback-spring.xml
deleted file mode 100644
index c7daf7a0e..000000000
--- a/logback-spring.xml
+++ /dev/null
@@ -1,146 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<configuration scan="true" scanPeriod="60 seconds" debug="false">
- <contextName>StreamPark</contextName>
- <property name="LOG_HOME" value="${app.home}/logs"/>
- <property name="LOG_MAX_HISTORY" value="15"/>
- value="%d{yyyy-MM-dd HH:mm:ss} | %highlight(%-5level) |
%boldYellow(%thread) | %boldGreen(%logger):%L] %msg%n"/>
- <property name="LOG_PATTERN"
- value="%d{yyyy-MM-dd HH:mm:ss.SSS} %contextName [%thread]
%-5level %logger{36}:%L - %msg%n"/>
- <property name="COLOR_PATTERN"
- value="%d{yyyy-MM-dd HH:mm:ss} | %highlight(%-5level) |
%boldYellow(%thread) | %boldGreen(%logger):%L] %msg%n"/>
-
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
- <pattern>${COLOR_PATTERN}</pattern>
- <charset>UTF-8</charset>
- </encoder>
- </appender>
-
- <appender name="INFO"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <filter class="ch.qos.logback.classic.filter.LevelFilter">
- <level>INFO</level>
- <onMatch>DENY</onMatch>
- <onMismatch>ACCEPT</onMismatch>
- </filter>
- <file>${LOG_HOME}/info.log</file>
- <rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
-
<fileNamePattern>${LOG_HOME}/info.%d{yyyy-MM-dd}_%i.log</fileNamePattern>
- <maxHistory>${LOG_MAX_HISTORY}</maxHistory>
- <maxFileSize>50MB</maxFileSize>
- <totalSizeCap>1GB</totalSizeCap>
- </rollingPolicy>
- <encoder>
- <pattern>${LOG_PATTERN}</pattern>
- <charset>UTF-8</charset>
- </encoder>
- </appender>
-
- <appender name="ERROR"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <filter class="ch.qos.logback.classic.filter.LevelFilter">
- <level>ERROR</level>
- <onMatch>ACCEPT</onMatch>
- <onMismatch>DENY</onMismatch>
- </filter>
- <file>${LOG_HOME}/error.log</file>
- <rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
-
<fileNamePattern>${LOG_HOME}/error.%d{yyyy-MM-dd}_%i.log</fileNamePattern>
- <maxHistory>${LOG_MAX_HISTORY}</maxHistory>
- <maxFileSize>50MB</maxFileSize>
- <totalSizeCap>1GB</totalSizeCap>
- </rollingPolicy>
- <encoder>
- <pattern>${LOG_PATTERN}</pattern>
- <charset>UTF-8</charset>
- </encoder>
- </appender>
-
- <appender name="WARN"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <filter class="ch.qos.logback.classic.filter.LevelFilter">
- <level>WARN</level>
- <onMatch>ACCEPT</onMatch>
- <onMismatch>DENY</onMismatch>
- </filter>
- <file>${LOG_HOME}/warn.log</file>
- <rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
-
<fileNamePattern>${LOG_HOME}/warn.%d{yyyy-MM-dd}_%i.log</fileNamePattern>
- <maxHistory>${LOG_MAX_HISTORY}</maxHistory>
- <maxFileSize>50MB</maxFileSize>
- <totalSizeCap>1GB</totalSizeCap>
- </rollingPolicy>
- <encoder>
- <pattern>${LOG_PATTERN}</pattern>
- <charset>UTF-8</charset>
- </encoder>
- </appender>
-
- <appender name="DEBUG"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <filter class="ch.qos.logback.classic.filter.LevelFilter">
- <level>DEBUG</level>
- <onMatch>ACCEPT</onMatch>
- <onMismatch>DENY</onMismatch>
- </filter>
- <file>${LOG_HOME}/debug.log</file>
- <rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
-
<fileNamePattern>${LOG_HOME}/debug.%d{yyyy-MM-dd}_%i.log</fileNamePattern>
- <maxHistory>${LOG_MAX_HISTORY}</maxHistory>
- <maxFileSize>50MB</maxFileSize>
- <totalSizeCap>1GB</totalSizeCap>
- </rollingPolicy>
- <encoder>
- <pattern>${LOG_PATTERN}</pattern>
- <charset>UTF-8</charset>
- </encoder>
- </appender>
-
- <appender name="TRACE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <filter class="ch.qos.logback.classic.filter.LevelFilter">
- <level>TRACE</level>
- <onMatch>ACCEPT</onMatch>
- <onMismatch>DENY</onMismatch>
- </filter>
- <file>${LOG_HOME}/trace.log</file>
- <rollingPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
-
<fileNamePattern>${LOG_HOME}/trace.%d{yyyy-MM-dd}_%i.log</fileNamePattern>
- <maxHistory>${LOG_MAX_HISTORY}</maxHistory>
- <maxFileSize>50MB</maxFileSize>
- <totalSizeCap>1GB</totalSizeCap>
- </rollingPolicy>
- <encoder>
- <pattern>${LOG_PATTERN}</pattern>
- <charset>UTF-8</charset>
- </encoder>
- </appender>
-
- <!-- log level -->
- <logger name="org.springframework.web" level="INFO"/>
- <logger name="com.apache.ibatis" level="TRACE"/>
- <logger name="java.sql.Connection" level="DEBUG"/>
- <logger name="java.sql.Statement" level="DEBUG"/>
- <logger name="java.sql.PreparedStatement" level="DEBUG"/>
-
- <root level="INFO">
- <appender-ref ref="STDOUT"/>
- <appender-ref ref="INFO"/>
- <appender-ref ref="WARN"/>
- <appender-ref ref="ERROR"/>
- <appender-ref ref="DEBUG"/>
- <appender-ref ref="TRACE"/>
- </root>
-
-</configuration>
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
index 6b1709c20..baf178b2e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProxyController.java
@@ -19,6 +19,8 @@ package org.apache.streampark.console.core.controller;
import org.apache.streampark.console.core.service.ProxyService;
+import org.apache.shiro.authz.annotation.RequiresPermissions;
+
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
@@ -39,24 +41,28 @@ public class ProxyController {
@Autowired private ProxyService proxyService;
@GetMapping("flink/{id}/**")
+ @RequiresPermissions("app:view")
public ResponseEntity<?> proxyFlink(HttpServletRequest request,
@PathVariable("id") Long id)
throws Exception {
return proxyService.proxyFlink(request, id);
}
@GetMapping("cluster/{id}/**")
+ @RequiresPermissions("app:view")
public ResponseEntity<?> proxyCluster(HttpServletRequest request,
@PathVariable("id") Long id)
throws Exception {
return proxyService.proxyCluster(request, id);
}
@GetMapping("history/{id}/**")
+ @RequiresPermissions("app:view")
public ResponseEntity<?> proxyHistory(HttpServletRequest request,
@PathVariable("id") Long id)
throws Exception {
return proxyService.proxyHistory(request, id);
}
@GetMapping("yarn/{id}/**")
+ @RequiresPermissions("app:view")
public ResponseEntity<?> proxyYarn(HttpServletRequest request,
@PathVariable("id") Long logId)
throws Exception {
return proxyService.proxyYarn(request, logId);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 61ec14784..79c168e7b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -206,6 +206,7 @@ public class Application implements Serializable {
private String k8sPodTemplate;
private String k8sJmPodTemplate;
+
private String k8sTmPodTemplate;
private String ingressTemplate;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
index fec264c8a..856858b87 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java
@@ -400,23 +400,23 @@ public class SavepointServiceImpl extends
ServiceImpl<SavepointMapper, Savepoint
}
private String getClusterId(Application application, FlinkCluster cluster) {
- if (ExecutionMode.isKubernetesMode(application.getExecutionMode())) {
- return
ExecutionMode.isKubernetesSessionMode(application.getExecutionMode())
- ? cluster.getClusterId()
- : application.getClusterId();
- } else if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
- if
(ExecutionMode.YARN_SESSION.equals(application.getExecutionModeEnum())) {
+ switch (application.getExecutionModeEnum()) {
+ case YARN_APPLICATION:
+ case YARN_PER_JOB:
+ return application.getClusterId();
+ case KUBERNETES_NATIVE_APPLICATION:
+ return application.getJobName();
+ case KUBERNETES_NATIVE_SESSION:
+ case YARN_SESSION:
Utils.notNull(
cluster,
String.format(
- "The yarn session clusterId=%s cannot be find, maybe the
clusterId is wrong or the cluster has been deleted. Please contact the Admin.",
- application.getFlinkClusterId()));
+ "The %s clusterId=%s cannot be find, maybe the clusterId is
wrong or the cluster has been deleted. Please contact the Admin.",
+ application.getExecutionModeEnum().getName(),
application.getFlinkClusterId()));
return cluster.getClusterId();
- } else {
- return application.getClusterId();
- }
+ default:
+ return null;
}
- return null;
}
@Override
diff --git
a/streampark-console/streampark-console-webapp/src/api/flink/app/app.ts
b/streampark-console/streampark-console-webapp/src/api/flink/app/app.ts
index ccb0d868f..348dfc125 100644
--- a/streampark-console/streampark-console-webapp/src/api/flink/app/app.ts
+++ b/streampark-console/streampark-console-webapp/src/api/flink/app/app.ts
@@ -47,8 +47,6 @@ enum APP_API {
REVOKE = '/flink/app/revoke',
OPTION_LOG = '/flink/app/optionlog',
DELETE_OPERATION_LOG = '/flink/app/deleteOperationLog',
- CHECK_JAR = '/flink/app/checkjar',
- VERIFY_SCHEMA = '/flink/app/verifySchema',
CHECK_SAVEPOINT_PATH = '/flink/app/checkSavepointPath',
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
index f8a433305..466121372 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/RemoteClient.scala
@@ -85,8 +85,12 @@ object RemoteClient extends FlinkClientTrait {
e.printStackTrace()
throw e
} finally {
- if (client != null) client.close()
- if (standAloneDescriptor != null) standAloneDescriptor._2.close()
+ if (client != null) {
+ client.close()
+ }
+ if (standAloneDescriptor != null) {
+ standAloneDescriptor._2.close()
+ }
}
}