This is an automated email from the ASF dual-hosted git repository.
yangzhou pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new bc4f1f39f [Improve] The common module avoids using Flink api (#1763)
bc4f1f39f is described below
commit bc4f1f39f68f739d6e1153ae26d12062539e0f47
Author: benjobs <[email protected]>
AuthorDate: Sat Oct 8 00:05:59 2022 +0800
[Improve] The common module avoids using Flink api (#1763)
---
.../main/scala/org/apache/streampark/common/util/ReflectUtils.scala | 3 +--
.../streampark/flink/connector/doris/internal/DorisSinkWriter.java | 4 ++--
2 files changed, 3 insertions(+), 4 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ReflectUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ReflectUtils.scala
index fc042e776..03f172281 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ReflectUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ReflectUtils.scala
@@ -18,7 +18,6 @@
package org.apache.streampark.common.util
import org.apache.commons.lang3.StringUtils
-import org.apache.flink.api.scala.ClosureCleaner.LOG
import java.lang.reflect.{Field, Modifier}
import java.util.Objects
@@ -61,7 +60,7 @@ object ReflectUtils extends Logger {
field.set(obj, value)
catch {
case e: IllegalAccessException =>
- LOG.error("Failed to assign to the element.", e)
+ logError("Failed to assign to the element.", e)
throw new Exception(e.getMessage)
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
index 875eb7582..63d7015c7 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
@@ -18,13 +18,13 @@
package org.apache.streampark.flink.connector.doris.internal;
import org.apache.streampark.common.enums.Semantic;
+import org.apache.streampark.common.util.ThreadUtils;
import org.apache.streampark.connector.doris.conf.DorisConfig;
import org.apache.streampark.flink.connector.doris.bean.DorisSinkBufferEntry;
import
org.apache.streampark.flink.connector.doris.bean.LoadStatusFailedException;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
-import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,7 +120,7 @@ public class DorisSinkWriter implements Serializable {
return;
}
stopSchedule();
- this.schedule = Executors.newScheduledThreadPool(1, new
ExecutorThreadFactory("doris-interval-sink"));
+ this.schedule = Executors.newScheduledThreadPool(1,
ThreadUtils.threadFactory("doris-interval-sink"));
this.scheduledFuture = this.schedule.schedule(() -> {
synchronized (DorisSinkWriter.this) {
if (!closed) {