This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 9502711cc [Fix] Fix the issue of Scala calling Mybatis LambdaWrapper
with Java function reference (#3164)
9502711cc is described below
commit 9502711cca4488bbaa3a974897570ab6671c6456
Author: Linying Assad <[email protected]>
AuthorDate: Fri Sep 22 22:06:47 2023 +0800
[Fix] Fix the issue of Scala calling Mybatis LambdaWrapper with Java
function reference (#3164)
* [feat][flink-k8s-v2] Migrate the creation process of the flink deploy
ingress to the Flink kubernetes operator (#2879)
* Fix the error type conversion of streampark.flink-k8s.enable-v2.
* Fix the issue of Scala calling Mybatis LambdaWrapper with Java function
reference parameters.
* Resolve conflict
* Format code
---
.../streampark/common/conf/K8sFlinkConfig.scala | 2 +-
.../console/core/entity/Application.java | 22 +++++++
.../console/core/entity/FlinkCluster.java | 13 +++++
.../console/core/task/FlinkK8sObserverBroker.scala | 67 +++++++++++-----------
.../core/task/FlinkK8sObserverBrokerSidecar.scala | 5 +-
.../console/core/utils/MybatisScalaExt.scala | 59 -------------------
6 files changed, 73 insertions(+), 95 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
index f5a20923a..39f3fc3d4 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
@@ -26,7 +26,7 @@ object K8sFlinkConfig {
val ENABLE_V2: InternalOption = InternalOption(
key = "streampark.flink-k8s.enable-v2",
defaultValue = false,
- classType = classOf[Boolean],
+ classType = classOf[java.lang.Boolean],
description =
"Whether to enable the v2 version(base on flink-kubernetes-operator) of
flink kubernetes operation"
)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 0c1593662..6299bcb24 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -44,6 +44,7 @@ import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
+import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
import lombok.SneakyThrows;
@@ -587,4 +588,25 @@ public class Application implements Serializable {
public int hashCode() {
return Objects.hash(id);
}
+
+ public static class SFunc {
+ public static final SFunction<Application, Long> ID = Application::getId;
+ public static final SFunction<Application, String> JOB_ID =
Application::getJobId;
+ public static final SFunction<Application, Date> START_TIME =
Application::getStartTime;
+ public static final SFunction<Application, Date> END_TIME =
Application::getEndTime;
+ public static final SFunction<Application, Long> DURATION =
Application::getDuration;
+ public static final SFunction<Application, Integer> TOTAL_TASK =
Application::getTotalTask;
+ public static final SFunction<Application, Integer> TOTAL_TM =
Application::getTotalTM;
+ public static final SFunction<Application, Integer> TOTAL_SLOT =
Application::getTotalSlot;
+ public static final SFunction<Application, Integer> JM_MEMORY =
Application::getJmMemory;
+ public static final SFunction<Application, Integer> TM_MEMORY =
Application::getTmMemory;
+ public static final SFunction<Application, Integer> STATE =
Application::getState;
+ public static final SFunction<Application, String> OPTIONS =
Application::getOptions;
+ public static final SFunction<Application, Integer> AVAILABLE_SLOT =
+ Application::getAvailableSlot;
+ public static final SFunction<Application, Integer> EXECUTION_MODE =
+ Application::getExecutionMode;
+ public static final SFunction<Application, String> JOB_MANAGER_URL =
+ Application::getJobManagerUrl;
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 6243d6f55..a09727e2c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -36,6 +36,7 @@ import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
+import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -186,4 +187,16 @@ public class FlinkCluster implements Serializable {
}
return map;
}
+
+ public static class SFunc {
+ public static final SFunction<FlinkCluster, Long> ID = FlinkCluster::getId;
+ public static final SFunction<FlinkCluster, String> ADDRESS =
FlinkCluster::getAddress;
+ public static final SFunction<FlinkCluster, String> JOB_MANAGER_URL =
+ FlinkCluster::getJobManagerUrl;
+ public static final SFunction<FlinkCluster, Integer> CLUSTER_STATE =
+ FlinkCluster::getClusterState;
+ public static final SFunction<FlinkCluster, Integer> EXECUTION_MODE =
+ FlinkCluster::getExecutionMode;
+ public static final SFunction<FlinkCluster, String> EXCEPTION =
FlinkCluster::getException;
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
index 66251914c..47ef5c536 100644
---
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
+++
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
@@ -30,13 +30,14 @@ import
org.apache.streampark.console.core.service.alert.AlertService
import
org.apache.streampark.console.core.service.application.ApplicationInfoService
import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter
import
org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter.{applicationToTrackKey,
clusterMetricsToFlinkMetricCV, flinkClusterToClusterKey,
k8sDeployStateToClusterState}
-import org.apache.streampark.console.core.utils.MybatisScalaExt.{lambdaQuery,
lambdaUpdate, LambdaQueryOps, LambdaUpdateOps}
import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV
import org.apache.streampark.flink.kubernetes.v2.model._
import
org.apache.streampark.flink.kubernetes.v2.model.TrackKey.{ApplicationJobKey,
ClusterKey}
import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
import
org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserverSnapSubscriptionHelper.{ClusterMetricsSnapsSubscriptionOps,
DeployCRSnapsSubscriptionOps, RestSvcEndpointSnapsSubscriptionOps}
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import zio.{Fiber, Ref, UIO, ZIO}
@@ -49,8 +50,6 @@ import javax.annotation.{PostConstruct, PreDestroy}
import java.lang
import java.util.Date
-import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
-
/** Broker of FlinkK8sObserver which is the observer for Flink on Kubernetes */
@Component
class FlinkK8sObserverBroker @Autowired() (
@@ -113,7 +112,8 @@ class FlinkK8sObserverBroker @Autowired() (
val fromApplicationRecords: UIO[Unit] = it
.safeFindApplication(
- lambdaQuery[Application].typedIn(_.getExecutionMode,
ExecutionMode.getKubernetesMode.asScala)
+ new LambdaQueryWrapper[Application]
+ .in(Application.SFunc.EXECUTION_MODE,
ExecutionMode.getKubernetesMode)
)(10)
.map(apps => apps.map(app =>
applicationToTrackKey(app)).filterSome.toVector)
.tap(keys => logInfo(s"Restore Flink K8s track-keys from Application
records:\n${keys.prettyStr}"))
@@ -121,7 +121,8 @@ class FlinkK8sObserverBroker @Autowired() (
val fromFlinkClusterRecords: UIO[Unit] = it
.safeFindFlinkClusterRecord(
- lambdaQuery[FlinkCluster].typedEq(_.getExecutionMode,
ExecutionMode.KUBERNETES_NATIVE_SESSION.getMode)
+ new LambdaQueryWrapper[FlinkCluster]
+ .eq(FlinkCluster.SFunc.EXECUTION_MODE,
ExecutionMode.KUBERNETES_NATIVE_SESSION.getMode)
)(10)
.map(clusters => clusters.map(e => TrackKey.cluster(e.getId,
e.getK8sNamespace, e.getClusterId)))
.tap(keys => logInfo(s"Restore Flink K8s track-keys from FlinkCluster
records:\n${keys.prettyStr}"))
@@ -140,27 +141,27 @@ class FlinkK8sObserverBroker @Autowired() (
.tap { case (snap: JobSnapshot, convertedState: FlinkAppState) =>
safeUpdateApplicationRecord(snap.appId) {
- var update = lambdaUpdate[Application]
- .typedSet(_.getState, convertedState.getValue)
- .typedSet(_.getOptions, OptionState.NONE.getValue)
+ var update = new LambdaUpdateWrapper[Application]
+ .set(Application.SFunc.STATE, convertedState.getValue)
+ .set(Application.SFunc.OPTIONS, OptionState.NONE.getValue)
// update JobStatus related columns
snap.jobStatus.foreach { status =>
update = update
- .typedSet(_.getJobId, status.jobId)
- .typedSet(_.getStartTime, new Date(status.startTs))
- .typedSet(_.getEndTime, status.endTs.map(new Date(_)).orNull)
- .typedSet(_.getDuration, status.duration)
- .typedSet(_.getTotalTask, status.tasks.map(_.total).getOrElse(0))
+ .set(Application.SFunc.JOB_ID, status.jobId)
+ .set(Application.SFunc.START_TIME, new Date(status.startTs))
+ .set(Application.SFunc.END_TIME, status.endTs.map(new
Date(_)).orNull)
+ .set(Application.SFunc.DURATION, status.duration)
+ .set(Application.SFunc.TOTAL_TASK,
status.tasks.map(_.total).getOrElse(0))
}
// Copy the logic from
resources/mapper/core/ApplicationMapper.xml:persistMetrics
if (FlinkAppState.isEndState(convertedState.getValue)) {
update = update
- .typedSet(_.getTotalTM, null)
- .typedSet(_.getTotalSlot, null)
- .typedSet(_.getTotalSlot, null)
- .typedSet(_.getAvailableSlot, null)
- .typedSet(_.getJmMemory, null)
- .typedSet(_.getTmMemory, null)
+ .set(Application.SFunc.TOTAL_TM, null)
+ .set(Application.SFunc.TOTAL_SLOT, null)
+ .set(Application.SFunc.AVAILABLE_SLOT, null)
+ .set(Application.SFunc.TOTAL_TASK, null)
+ .set(Application.SFunc.JM_MEMORY, null)
+ .set(Application.SFunc.TM_MEMORY, null)
}
update
}
@@ -196,12 +197,12 @@ class FlinkK8sObserverBroker @Autowired() (
// Update metrics info of the corresponding Application record
substream.mapZIO { case (trackKey: ApplicationJobKey, metrics:
ClusterMetrics) =>
safeUpdateApplicationRecord(trackKey.id)(
- lambdaUpdate[Application]
- .typedSet(_.getJmMemory, metrics.totalJmMemory)
- .typedSet(_.getTmMemory, metrics.totalTmMemory)
- .typedSet(_.getTotalTM, metrics.totalTm)
- .typedSet(_.getTotalSlot, metrics.totalSlot)
- .typedSet(_.getAvailableSlot, metrics.availableSlot))
+ new LambdaUpdateWrapper[Application]
+ .set(Application.SFunc.JM_MEMORY, metrics.totalJmMemory)
+ .set(Application.SFunc.TM_MEMORY, metrics.totalTmMemory)
+ .set(Application.SFunc.TOTAL_TM, metrics.totalTm)
+ .set(Application.SFunc.TOTAL_SLOT, metrics.totalSlot)
+ .set(Application.SFunc.AVAILABLE_SLOT, metrics.availableSlot))
}
}
.runDrain
@@ -219,9 +220,9 @@ class FlinkK8sObserverBroker @Autowired() (
// Update the corresponding FlinkCluster record
.tap { case (id, state, error) =>
safeUpdateFlinkClusterRecord(id)(
- lambdaUpdate[FlinkCluster]
- .typedSet(_.getClusterState, state.getValue)
- .typedSet(error.isDefined, _.getException, error.get))
+ new LambdaUpdateWrapper[FlinkCluster]
+ .set(FlinkCluster.SFunc.CLUSTER_STATE, state.getValue)
+ .set(error.isDefined, FlinkCluster.SFunc.EXCEPTION, error.get))
}
// Alter for unhealthy state in parallel
.filter { case (_, state, _) => alertClusterStateList.contains(state) }
@@ -258,7 +259,9 @@ class FlinkK8sObserverBroker @Autowired() (
.groupByKey(_._1.id) { case (_, substream) =>
// Update jobManagerUrl of the corresponding Application record
substream.mapZIO { case (key: ApplicationJobKey, endpoint:
RestSvcEndpoint) =>
-
safeUpdateApplicationRecord(key.id)(lambdaUpdate[Application].typedSet(_.getJobManagerUrl,
endpoint.ipRest))
+ safeUpdateApplicationRecord(key.id)(
+ new
LambdaUpdateWrapper[Application].set(Application.SFunc.JOB_MANAGER_URL,
endpoint.ipRest)
+ )
}
}
.runDrain
@@ -274,9 +277,9 @@ class FlinkK8sObserverBroker @Autowired() (
.groupByKey(_._1) { case (_, substream) =>
substream.mapZIO { case (key: ClusterKey, endpoint: RestSvcEndpoint) =>
safeUpdateFlinkClusterRecord(key.id)(
- lambdaUpdate[FlinkCluster]
- .typedSet(_.getAddress, endpoint.ipRest)
- .typedSet(_.getJobManagerUrl, endpoint.ipRest))
+ new LambdaUpdateWrapper[FlinkCluster]
+ .set(FlinkCluster.SFunc.ADDRESS, endpoint.ipRest)
+ .set(FlinkCluster.SFunc.JOB_MANAGER_URL, endpoint.ipRest))
}
}
.runDrain
diff --git
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBrokerSidecar.scala
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBrokerSidecar.scala
index 69125acd8..e2ed9350f 100644
---
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBrokerSidecar.scala
+++
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBrokerSidecar.scala
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.task
import org.apache.streampark.console.core.entity.{Application, FlinkCluster}
import org.apache.streampark.console.core.service.FlinkClusterService
import
org.apache.streampark.console.core.service.application.ApplicationInfoService
-import org.apache.streampark.console.core.utils.MybatisScalaExt.LambdaUpdateOps
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper
@@ -46,7 +45,7 @@ trait FlinkK8sObserverBrokerSidecar {
// Update Application record by appId into persistent storage.
protected def safeUpdateApplicationRecord(appId: Long)(update:
LambdaUpdateWrapper[Application]): UIO[Unit] = {
ZIO
- .attemptBlocking(applicationInfoService.update(null,
update.typedEq(_.getId, appId)))
+ .attemptBlocking(applicationInfoService.update(null,
update.eq(Application.SFunc.ID, appId)))
.retryN(2)
.tapError(err => logError(s"Fail to update Application record:
${err.getMessage}"))
.ignore
@@ -63,7 +62,7 @@ trait FlinkK8sObserverBrokerSidecar {
// Update FlinkCluster record by id into persistent storage.
protected def safeUpdateFlinkClusterRecord(id: Long)(update:
LambdaUpdateWrapper[FlinkCluster]): UIO[Unit] = {
ZIO
- .attemptBlocking(flinkClusterService.update(null,
update.typedEq(_.getId, id)))
+ .attemptBlocking(flinkClusterService.update(null,
update.eq(FlinkCluster.SFunc.ID, id)))
.retryN(3)
.tapError(err => logError(s"Fail to update FlinkCluster record:
${err.getMessage}"))
.ignore
diff --git
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/MybatisScalaExt.scala
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/MybatisScalaExt.scala
deleted file mode 100644
index 08a103b32..000000000
---
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/MybatisScalaExt.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.utils
-
-import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper
-import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper
-import com.baomidou.mybatisplus.core.toolkit.support.SFunction
-
-import scala.jdk.CollectionConverters._
-import scala.language.implicitConversions
-
-/** MyBatis scala extension. */
-object MybatisScalaExt {
-
- def lambdaQuery[E]: LambdaQueryWrapper[E] = new LambdaQueryWrapper[E]()
- def lambdaUpdate[E]: LambdaUpdateWrapper[E] = new LambdaUpdateWrapper[E]()
-
- // noinspection DuplicatedCode
- implicit class LambdaQueryOps[E](wrapper: LambdaQueryWrapper[E]) {
- def typedIn[V](column: E => V, values: Iterable[V]): LambdaQueryWrapper[E]
= {
- wrapper.in(new SFunction[E, V] { override def apply(t: E): V = column(t)
}, values.asJavaCollection)
- }
-
- def typedEq[V](column: E => V, value: V): LambdaQueryWrapper[E] = {
- wrapper.eq(new SFunction[E, V] { override def apply(t: E): V = column(t)
}, value)
- }
- }
-
- // noinspection DuplicatedCode
- implicit class LambdaUpdateOps[E](wrapper: LambdaUpdateWrapper[E]) {
- def typedSet[V](column: E => V, value: V): LambdaUpdateWrapper[E] = {
- wrapper.set((e: E) => column(e), value)
- }
-
- def typedSet[V](cond: Boolean, column: E => V, value: V):
LambdaUpdateWrapper[E] = {
- wrapper.set(cond, new SFunction[E, V] { override def apply(t: E): V =
column(t) }, value)
- }
-
- def typedEq[V](column: E => V, value: V): LambdaUpdateWrapper[E] = {
- wrapper.eq(new SFunction[E, V] { override def apply(t: E): V = column(t)
}, value)
- }
- }
-
-}