This is an automated email from the ASF dual-hosted git repository.

yangzhou pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new bc4f1f39f [Improve] The common module avoids using Flink api (#1763)
bc4f1f39f is described below

commit bc4f1f39f68f739d6e1153ae26d12062539e0f47
Author: benjobs <[email protected]>
AuthorDate: Sat Oct 8 00:05:59 2022 +0800

    [Improve] The common module avoids using Flink api (#1763)
---
 .../main/scala/org/apache/streampark/common/util/ReflectUtils.scala   | 3 +--
 .../streampark/flink/connector/doris/internal/DorisSinkWriter.java    | 4 ++--
 2 files changed, 3 insertions(+), 4 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ReflectUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ReflectUtils.scala
index fc042e776..03f172281 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ReflectUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ReflectUtils.scala
@@ -18,7 +18,6 @@
 package org.apache.streampark.common.util
 
 import org.apache.commons.lang3.StringUtils
-import org.apache.flink.api.scala.ClosureCleaner.LOG
 
 import java.lang.reflect.{Field, Modifier}
 import java.util.Objects
@@ -61,7 +60,7 @@ object ReflectUtils extends Logger {
       field.set(obj, value)
     catch {
       case e: IllegalAccessException =>
-        LOG.error("Failed to assign to the element.", e)
+        logError("Failed to assign to the element.", e)
         throw new Exception(e.getMessage)
     }
   }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
index 875eb7582..63d7015c7 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
@@ -18,13 +18,13 @@
 package org.apache.streampark.flink.connector.doris.internal;
 
 import org.apache.streampark.common.enums.Semantic;
+import org.apache.streampark.common.util.ThreadUtils;
 import org.apache.streampark.connector.doris.conf.DorisConfig;
 import org.apache.streampark.flink.connector.doris.bean.DorisSinkBufferEntry;
 import 
org.apache.streampark.flink.connector.doris.bean.LoadStatusFailedException;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -120,7 +120,7 @@ public class DorisSinkWriter implements Serializable {
             return;
         }
         stopSchedule();
-        this.schedule = Executors.newScheduledThreadPool(1, new 
ExecutorThreadFactory("doris-interval-sink"));
+        this.schedule = Executors.newScheduledThreadPool(1, 
ThreadUtils.threadFactory("doris-interval-sink"));
         this.scheduledFuture = this.schedule.schedule(() -> {
             synchronized (DorisSinkWriter.this) {
                 if (!closed) {

Reply via email to