This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.2
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.2 by this push:
new dfd01a264 scala 2.11 code style improvement
dfd01a264 is described below
commit dfd01a2642e8b2c14617121f51e6ff4fb47bc23c
Author: benjobs <[email protected]>
AuthorDate: Sat Oct 7 12:39:44 2023 +0800
scala 2.11 code style improvement
---
.../org/apache/streampark/flink/core/scala/DataStreamExt.scala | 10 +++++++---
.../apache/streampark/flink/proxy/ChildFirstClassLoader.scala | 8 +++++++-
2 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/DataStreamExt.scala
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/DataStreamExt.scala
index 5e2805aa1..c1e925e0d 100644
---
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/DataStreamExt.scala
+++
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/DataStreamExt.scala
@@ -38,11 +38,15 @@ object DataStreamExt {
class DataStream[T: TypeInformation](dataStream: DStream[T]) {
def sideOut(fun: (T, ProcFunc[T, T]#Context) => Unit): DStream[T] =
- dataStream.process(
- (value: T, ctx: ProcFunc[T, T]#Context, out: Collector[T]) => {
+ dataStream.process(new ProcFunc[T, T] {
+ override def processElement(
+ value: T,
+ ctx: ProcFunc[T, T]#Context,
+ out: Collector[T]): Unit = {
fun(value, ctx)
out.collect(value)
- })
+ }
+ })
def sideGet[R: TypeInformation](sideTag: String): DStream[R] =
dataStream.getSideOutput(new OutputTag[R](sideTag))
diff --git
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
index 2d8f5cde3..7db1fbe90 100644
---
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
+++
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/ChildFirstClassLoader.scala
@@ -43,7 +43,13 @@ class ChildFirstClassLoader(
ClassLoader.registerAsParallelCapable()
def this(urls: Array[URL], parent: ClassLoader, flinkResourcePattern:
Pattern) {
- this(urls, parent, flinkResourcePattern, (_: Throwable) => {})
+ this(
+ urls,
+ parent,
+ flinkResourcePattern,
+ new Consumer[Throwable] {
+ override def accept(t: Throwable): Unit = {}
+ })
}
private val FLINK_PATTERN =