This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9502711cc [Fix] Fix the issue of Scala calling Mybatis LambdaWrapper 
with Java function reference (#3164)
9502711cc is described below

commit 9502711cca4488bbaa3a974897570ab6671c6456
Author: Linying Assad <[email protected]>
AuthorDate: Fri Sep 22 22:06:47 2023 +0800

    [Fix] Fix the issue of Scala calling Mybatis LambdaWrapper with Java 
function reference (#3164)
    
    * [feat][flink-k8s-v2] Migrate the creation process of the flink deploy 
ingress to the Flink kubernetes operator (#2879)
    
    * Fix the error type conversion of streampark.flink-k8s.enable-v2.
    
    * Fix the issue of Scala calling Mybatis LambdaWrapper with Java function 
reference parameters.
    
    * Resolve conflict
    
    * Format code
---
 .../streampark/common/conf/K8sFlinkConfig.scala    |  2 +-
 .../console/core/entity/Application.java           | 22 +++++++
 .../console/core/entity/FlinkCluster.java          | 13 +++++
 .../console/core/task/FlinkK8sObserverBroker.scala | 67 +++++++++++-----------
 .../core/task/FlinkK8sObserverBrokerSidecar.scala  |  5 +-
 .../console/core/utils/MybatisScalaExt.scala       | 59 -------------------
 6 files changed, 73 insertions(+), 95 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
index f5a20923a..39f3fc3d4 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
@@ -26,7 +26,7 @@ object K8sFlinkConfig {
   val ENABLE_V2: InternalOption = InternalOption(
     key = "streampark.flink-k8s.enable-v2",
     defaultValue = false,
-    classType = classOf[Boolean],
+    classType = classOf[java.lang.Boolean],
     description =
       "Whether to enable the v2 version(base on flink-kubernetes-operator) of 
flink kubernetes operation"
   )
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 0c1593662..6299bcb24 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -44,6 +44,7 @@ import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableField;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
+import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import lombok.Data;
 import lombok.SneakyThrows;
@@ -587,4 +588,25 @@ public class Application implements Serializable {
   public int hashCode() {
     return Objects.hash(id);
   }
+
+  public static class SFunc {
+    public static final SFunction<Application, Long> ID = Application::getId;
+    public static final SFunction<Application, String> JOB_ID = 
Application::getJobId;
+    public static final SFunction<Application, Date> START_TIME = 
Application::getStartTime;
+    public static final SFunction<Application, Date> END_TIME = 
Application::getEndTime;
+    public static final SFunction<Application, Long> DURATION = 
Application::getDuration;
+    public static final SFunction<Application, Integer> TOTAL_TASK = 
Application::getTotalTask;
+    public static final SFunction<Application, Integer> TOTAL_TM = 
Application::getTotalTM;
+    public static final SFunction<Application, Integer> TOTAL_SLOT = 
Application::getTotalSlot;
+    public static final SFunction<Application, Integer> JM_MEMORY = 
Application::getJmMemory;
+    public static final SFunction<Application, Integer> TM_MEMORY = 
Application::getTmMemory;
+    public static final SFunction<Application, Integer> STATE = 
Application::getState;
+    public static final SFunction<Application, String> OPTIONS = 
Application::getOptions;
+    public static final SFunction<Application, Integer> AVAILABLE_SLOT =
+        Application::getAvailableSlot;
+    public static final SFunction<Application, Integer> EXECUTION_MODE =
+        Application::getExecutionMode;
+    public static final SFunction<Application, String> JOB_MANAGER_URL =
+        Application::getJobManagerUrl;
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 6243d6f55..a09727e2c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -36,6 +36,7 @@ import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableField;
 import com.baomidou.mybatisplus.annotation.TableId;
 import com.baomidou.mybatisplus.annotation.TableName;
+import com.baomidou.mybatisplus.core.toolkit.support.SFunction;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -186,4 +187,16 @@ public class FlinkCluster implements Serializable {
     }
     return map;
   }
+
+  public static class SFunc {
+    public static final SFunction<FlinkCluster, Long> ID = FlinkCluster::getId;
+    public static final SFunction<FlinkCluster, String> ADDRESS = 
FlinkCluster::getAddress;
+    public static final SFunction<FlinkCluster, String> JOB_MANAGER_URL =
+        FlinkCluster::getJobManagerUrl;
+    public static final SFunction<FlinkCluster, Integer> CLUSTER_STATE =
+        FlinkCluster::getClusterState;
+    public static final SFunction<FlinkCluster, Integer> EXECUTION_MODE =
+        FlinkCluster::getExecutionMode;
+    public static final SFunction<FlinkCluster, String> EXCEPTION = 
FlinkCluster::getException;
+  }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
index 66251914c..47ef5c536 100644
--- 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
+++ 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
@@ -30,13 +30,14 @@ import 
org.apache.streampark.console.core.service.alert.AlertService
 import 
org.apache.streampark.console.core.service.application.ApplicationInfoService
 import org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter
 import 
org.apache.streampark.console.core.utils.FlinkK8sDataTypeConverter.{applicationToTrackKey,
 clusterMetricsToFlinkMetricCV, flinkClusterToClusterKey, 
k8sDeployStateToClusterState}
-import org.apache.streampark.console.core.utils.MybatisScalaExt.{lambdaQuery, 
lambdaUpdate, LambdaQueryOps, LambdaUpdateOps}
 import org.apache.streampark.flink.kubernetes.model.FlinkMetricCV
 import org.apache.streampark.flink.kubernetes.v2.model._
 import 
org.apache.streampark.flink.kubernetes.v2.model.TrackKey.{ApplicationJobKey, 
ClusterKey}
 import org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserver
 import 
org.apache.streampark.flink.kubernetes.v2.observer.FlinkK8sObserverSnapSubscriptionHelper.{ClusterMetricsSnapsSubscriptionOps,
 DeployCRSnapsSubscriptionOps, RestSvcEndpointSnapsSubscriptionOps}
 
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.stereotype.Component
 import zio.{Fiber, Ref, UIO, ZIO}
@@ -49,8 +50,6 @@ import javax.annotation.{PostConstruct, PreDestroy}
 import java.lang
 import java.util.Date
 
-import scala.jdk.CollectionConverters.collectionAsScalaIterableConverter
-
 /** Broker of FlinkK8sObserver which is the observer for Flink on Kubernetes */
 @Component
 class FlinkK8sObserverBroker @Autowired() (
@@ -113,7 +112,8 @@ class FlinkK8sObserverBroker @Autowired() (
 
     val fromApplicationRecords: UIO[Unit] = it
       .safeFindApplication(
-        lambdaQuery[Application].typedIn(_.getExecutionMode, 
ExecutionMode.getKubernetesMode.asScala)
+        new LambdaQueryWrapper[Application]
+          .in(Application.SFunc.EXECUTION_MODE, 
ExecutionMode.getKubernetesMode)
       )(10)
       .map(apps => apps.map(app => 
applicationToTrackKey(app)).filterSome.toVector)
       .tap(keys => logInfo(s"Restore Flink K8s track-keys from Application 
records:\n${keys.prettyStr}"))
@@ -121,7 +121,8 @@ class FlinkK8sObserverBroker @Autowired() (
 
     val fromFlinkClusterRecords: UIO[Unit] = it
       .safeFindFlinkClusterRecord(
-        lambdaQuery[FlinkCluster].typedEq(_.getExecutionMode, 
ExecutionMode.KUBERNETES_NATIVE_SESSION.getMode)
+        new LambdaQueryWrapper[FlinkCluster]
+          .eq(FlinkCluster.SFunc.EXECUTION_MODE, 
ExecutionMode.KUBERNETES_NATIVE_SESSION.getMode)
       )(10)
       .map(clusters => clusters.map(e => TrackKey.cluster(e.getId, 
e.getK8sNamespace, e.getClusterId)))
       .tap(keys => logInfo(s"Restore Flink K8s track-keys from FlinkCluster 
records:\n${keys.prettyStr}"))
@@ -140,27 +141,27 @@ class FlinkK8sObserverBroker @Autowired() (
       .tap { case (snap: JobSnapshot, convertedState: FlinkAppState) =>
         safeUpdateApplicationRecord(snap.appId) {
 
-          var update = lambdaUpdate[Application]
-            .typedSet(_.getState, convertedState.getValue)
-            .typedSet(_.getOptions, OptionState.NONE.getValue)
+          var update = new LambdaUpdateWrapper[Application]
+            .set(Application.SFunc.STATE, convertedState.getValue)
+            .set(Application.SFunc.OPTIONS, OptionState.NONE.getValue)
           // update JobStatus related columns
           snap.jobStatus.foreach { status =>
             update = update
-              .typedSet(_.getJobId, status.jobId)
-              .typedSet(_.getStartTime, new Date(status.startTs))
-              .typedSet(_.getEndTime, status.endTs.map(new Date(_)).orNull)
-              .typedSet(_.getDuration, status.duration)
-              .typedSet(_.getTotalTask, status.tasks.map(_.total).getOrElse(0))
+              .set(Application.SFunc.JOB_ID, status.jobId)
+              .set(Application.SFunc.START_TIME, new Date(status.startTs))
+              .set(Application.SFunc.END_TIME, status.endTs.map(new 
Date(_)).orNull)
+              .set(Application.SFunc.DURATION, status.duration)
+              .set(Application.SFunc.TOTAL_TASK, 
status.tasks.map(_.total).getOrElse(0))
           }
           // Copy the logic from 
resources/mapper/core/ApplicationMapper.xml:persistMetrics
           if (FlinkAppState.isEndState(convertedState.getValue)) {
             update = update
-              .typedSet(_.getTotalTM, null)
-              .typedSet(_.getTotalSlot, null)
-              .typedSet(_.getTotalSlot, null)
-              .typedSet(_.getAvailableSlot, null)
-              .typedSet(_.getJmMemory, null)
-              .typedSet(_.getTmMemory, null)
+              .set(Application.SFunc.TOTAL_TM, null)
+              .set(Application.SFunc.TOTAL_SLOT, null)
+              .set(Application.SFunc.AVAILABLE_SLOT, null)
+              .set(Application.SFunc.TOTAL_TASK, null)
+              .set(Application.SFunc.JM_MEMORY, null)
+              .set(Application.SFunc.TM_MEMORY, null)
           }
           update
         }
@@ -196,12 +197,12 @@ class FlinkK8sObserverBroker @Autowired() (
         // Update metrics info of the corresponding Application record
         substream.mapZIO { case (trackKey: ApplicationJobKey, metrics: 
ClusterMetrics) =>
           safeUpdateApplicationRecord(trackKey.id)(
-            lambdaUpdate[Application]
-              .typedSet(_.getJmMemory, metrics.totalJmMemory)
-              .typedSet(_.getTmMemory, metrics.totalTmMemory)
-              .typedSet(_.getTotalTM, metrics.totalTm)
-              .typedSet(_.getTotalSlot, metrics.totalSlot)
-              .typedSet(_.getAvailableSlot, metrics.availableSlot))
+            new LambdaUpdateWrapper[Application]
+              .set(Application.SFunc.JM_MEMORY, metrics.totalJmMemory)
+              .set(Application.SFunc.TM_MEMORY, metrics.totalTmMemory)
+              .set(Application.SFunc.TOTAL_TM, metrics.totalTm)
+              .set(Application.SFunc.TOTAL_SLOT, metrics.totalSlot)
+              .set(Application.SFunc.AVAILABLE_SLOT, metrics.availableSlot))
         }
       }
       .runDrain
@@ -219,9 +220,9 @@ class FlinkK8sObserverBroker @Autowired() (
         // Update the corresponding FlinkCluster record
         .tap { case (id, state, error) =>
           safeUpdateFlinkClusterRecord(id)(
-            lambdaUpdate[FlinkCluster]
-              .typedSet(_.getClusterState, state.getValue)
-              .typedSet(error.isDefined, _.getException, error.get))
+            new LambdaUpdateWrapper[FlinkCluster]
+              .set(FlinkCluster.SFunc.CLUSTER_STATE, state.getValue)
+              .set(error.isDefined, FlinkCluster.SFunc.EXCEPTION, error.get))
         }
         // Alter for unhealthy state in parallel
         .filter { case (_, state, _) => alertClusterStateList.contains(state) }
@@ -258,7 +259,9 @@ class FlinkK8sObserverBroker @Autowired() (
       .groupByKey(_._1.id) { case (_, substream) =>
         // Update jobManagerUrl of the corresponding Application record
         substream.mapZIO { case (key: ApplicationJobKey, endpoint: 
RestSvcEndpoint) =>
-          
safeUpdateApplicationRecord(key.id)(lambdaUpdate[Application].typedSet(_.getJobManagerUrl,
 endpoint.ipRest))
+          safeUpdateApplicationRecord(key.id)(
+            new 
LambdaUpdateWrapper[Application].set(Application.SFunc.JOB_MANAGER_URL, 
endpoint.ipRest)
+          )
         }
       }
       .runDrain
@@ -274,9 +277,9 @@ class FlinkK8sObserverBroker @Autowired() (
       .groupByKey(_._1) { case (_, substream) =>
         substream.mapZIO { case (key: ClusterKey, endpoint: RestSvcEndpoint) =>
           safeUpdateFlinkClusterRecord(key.id)(
-            lambdaUpdate[FlinkCluster]
-              .typedSet(_.getAddress, endpoint.ipRest)
-              .typedSet(_.getJobManagerUrl, endpoint.ipRest))
+            new LambdaUpdateWrapper[FlinkCluster]
+              .set(FlinkCluster.SFunc.ADDRESS, endpoint.ipRest)
+              .set(FlinkCluster.SFunc.JOB_MANAGER_URL, endpoint.ipRest))
         }
       }
       .runDrain
diff --git 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBrokerSidecar.scala
 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBrokerSidecar.scala
index 69125acd8..e2ed9350f 100644
--- 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBrokerSidecar.scala
+++ 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBrokerSidecar.scala
@@ -20,7 +20,6 @@ package org.apache.streampark.console.core.task
 import org.apache.streampark.console.core.entity.{Application, FlinkCluster}
 import org.apache.streampark.console.core.service.FlinkClusterService
 import 
org.apache.streampark.console.core.service.application.ApplicationInfoService
-import org.apache.streampark.console.core.utils.MybatisScalaExt.LambdaUpdateOps
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper
@@ -46,7 +45,7 @@ trait FlinkK8sObserverBrokerSidecar {
   // Update Application record by appId into persistent storage.
   protected def safeUpdateApplicationRecord(appId: Long)(update: 
LambdaUpdateWrapper[Application]): UIO[Unit] = {
     ZIO
-      .attemptBlocking(applicationInfoService.update(null, 
update.typedEq(_.getId, appId)))
+      .attemptBlocking(applicationInfoService.update(null, 
update.eq(Application.SFunc.ID, appId)))
       .retryN(2)
       .tapError(err => logError(s"Fail to update Application record: 
${err.getMessage}"))
       .ignore
@@ -63,7 +62,7 @@ trait FlinkK8sObserverBrokerSidecar {
   // Update FlinkCluster record by id into persistent storage.
   protected def safeUpdateFlinkClusterRecord(id: Long)(update: 
LambdaUpdateWrapper[FlinkCluster]): UIO[Unit] = {
     ZIO
-      .attemptBlocking(flinkClusterService.update(null, 
update.typedEq(_.getId, id)))
+      .attemptBlocking(flinkClusterService.update(null, 
update.eq(FlinkCluster.SFunc.ID, id)))
       .retryN(3)
       .tapError(err => logError(s"Fail to update FlinkCluster record: 
${err.getMessage}"))
       .ignore
diff --git 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/MybatisScalaExt.scala
 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/MybatisScalaExt.scala
deleted file mode 100644
index 08a103b32..000000000
--- 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/utils/MybatisScalaExt.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.utils
-
-import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper
-import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper
-import com.baomidou.mybatisplus.core.toolkit.support.SFunction
-
-import scala.jdk.CollectionConverters._
-import scala.language.implicitConversions
-
-/** MyBatis scala extension. */
-object MybatisScalaExt {
-
-  def lambdaQuery[E]: LambdaQueryWrapper[E]   = new LambdaQueryWrapper[E]()
-  def lambdaUpdate[E]: LambdaUpdateWrapper[E] = new LambdaUpdateWrapper[E]()
-
-  // noinspection DuplicatedCode
-  implicit class LambdaQueryOps[E](wrapper: LambdaQueryWrapper[E]) {
-    def typedIn[V](column: E => V, values: Iterable[V]): LambdaQueryWrapper[E] 
= {
-      wrapper.in(new SFunction[E, V] { override def apply(t: E): V = column(t) 
}, values.asJavaCollection)
-    }
-
-    def typedEq[V](column: E => V, value: V): LambdaQueryWrapper[E] = {
-      wrapper.eq(new SFunction[E, V] { override def apply(t: E): V = column(t) 
}, value)
-    }
-  }
-
-  // noinspection DuplicatedCode
-  implicit class LambdaUpdateOps[E](wrapper: LambdaUpdateWrapper[E]) {
-    def typedSet[V](column: E => V, value: V): LambdaUpdateWrapper[E] = {
-      wrapper.set((e: E) => column(e), value)
-    }
-
-    def typedSet[V](cond: Boolean, column: E => V, value: V): 
LambdaUpdateWrapper[E] = {
-      wrapper.set(cond, new SFunction[E, V] { override def apply(t: E): V = 
column(t) }, value)
-    }
-
-    def typedEq[V](column: E => V, value: V): LambdaUpdateWrapper[E] = {
-      wrapper.eq(new SFunction[E, V] { override def apply(t: E): V = column(t) 
}, value)
-    }
-  }
-
-}

Reply via email to