This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch Improve-version in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit d8c96ec942510a184d42e77f82b06b03839d205d Author: benjobs <[email protected]> AuthorDate: Fri Oct 7 23:49:23 2022 +0800 [Improve] The common module avoids using Flink api --- .../main/scala/org/apache/streampark/common/util/ReflectUtils.scala | 3 +-- .../streampark/flink/connector/doris/internal/DorisSinkWriter.java | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/ReflectUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/ReflectUtils.scala index fc042e776..03f172281 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/ReflectUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/ReflectUtils.scala @@ -18,7 +18,6 @@ package org.apache.streampark.common.util import org.apache.commons.lang3.StringUtils -import org.apache.flink.api.scala.ClosureCleaner.LOG import java.lang.reflect.{Field, Modifier} import java.util.Objects @@ -61,7 +60,7 @@ object ReflectUtils extends Logger { field.set(obj, value) catch { case e: IllegalAccessException => - LOG.error("Failed to assign to the element.", e) + logError("Failed to assign to the element.", e) throw new Exception(e.getMessage) } } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java index 875eb7582..63d7015c7 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java @@ -18,13 +18,13 @@ package org.apache.streampark.flink.connector.doris.internal; import org.apache.streampark.common.enums.Semantic; +import org.apache.streampark.common.util.ThreadUtils; import org.apache.streampark.connector.doris.conf.DorisConfig; import org.apache.streampark.flink.connector.doris.bean.DorisSinkBufferEntry; import org.apache.streampark.flink.connector.doris.bean.LoadStatusFailedException; import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.metrics.Counter; -import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,7 +120,7 @@ public class DorisSinkWriter implements Serializable { return; } stopSchedule(); - this.schedule = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-interval-sink")); + this.schedule = Executors.newScheduledThreadPool(1, ThreadUtils.threadFactory("doris-interval-sink")); this.scheduledFuture = this.schedule.schedule(() -> { synchronized (DorisSinkWriter.this) { if (!closed) {
