This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
     new dfd01a264 scala 2.11 code style improvement
dfd01a264 is described below

commit dfd01a2642e8b2c14617121f51e6ff4fb47bc23c
Author: benjobs <[email protected]>
AuthorDate: Sat Oct 7 12:39:44 2023 +0800

    scala 2.11 code style improvement
---
 .../org/apache/streampark/flink/core/scala/DataStreamExt.scala | 10 +++++++---
 .../apache/streampark/flink/proxy/ChildFirstClassLoader.scala  |  8 +++++++-
 2 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/DataStreamExt.scala
 
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/DataStreamExt.scala
index 5e2805aa1..c1e925e0d 100644
--- 
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/DataStreamExt.scala
+++ 
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/DataStreamExt.scala
@@ -38,11 +38,15 @@ object DataStreamExt {
   class DataStream[T: TypeInformation](dataStream: DStream[T]) {
 
     def sideOut(fun: (T, ProcFunc[T, T]#Context) => Unit): DStream[T] =
-      dataStream.process(
-        (value: T, ctx: ProcFunc[T, T]#Context, out: Collector[T]) => {
+      dataStream.process(new ProcFunc[T, T] {
+        override def processElement(
+            value: T,
+            ctx: ProcFunc[T, T]#Context,
+            out: Collector[T]): Unit = {
           fun(value, ctx)
           out.collect(value)
-        })
+        }
+      })
 
     def sideGet[R: TypeInformation](sideTag: String): DStream[R] =
       dataStream.getSideOutput(new OutputTag[R](sideTag))
diff --git 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
index 2d8f5cde3..7db1fbe90 100644
--- 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
+++ 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
@@ -43,7 +43,13 @@ class ChildFirstClassLoader(
   ClassLoader.registerAsParallelCapable()
 
   def this(urls: Array[URL], parent: ClassLoader, flinkResourcePattern: 
Pattern) {
-    this(urls, parent, flinkResourcePattern, (_: Throwable) => {})
+    this(
+      urls,
+      parent,
+      flinkResourcePattern,
+      new Consumer[Throwable] {
+        override def accept(t: Throwable): Unit = {}
+      })
   }
 
   private val FLINK_PATTERN =

Reply via email to