This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
     new dbc31b8c6 [Improve] doris datastream-connector code style improvement
dbc31b8c6 is described below

commit dbc31b8c6c223121834aace36d1098427674566c
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 26 18:26:09 2025 +0800

    [Improve] doris datastream-connector code style improvement
---
 .../connector/doris/internal/DorisSinkFunction.java   | 18 +++++++++---------
 .../connector/doris/internal/DorisSinkWriter.java     | 12 ++++++------
 .../connector/doris/util/DorisDelimiterParser.java    | 19 ++++++++++++-------
 3 files changed, 27 insertions(+), 22 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
index 5018d2451..0d273d65e 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Properties;
 
@@ -46,18 +47,17 @@ import java.util.Properties;
 public class DorisSinkFunction<T> extends RichSinkFunction<T> implements 
CheckpointedFunction {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(DorisSinkFunction.class);
-  private final Properties properties;
   private final DorisSinkWriter dorisSinkWriter;
   private final DorisConfig dorisConfig;
   // state only works with `EXACTLY_ONCE`
-  private transient ListState<Map<String, DorisSinkBufferEntry>> 
checkpointedState;
+  private transient ListState<Map<String, DorisSinkBufferEntry>> 
checkpointState;
   private transient Counter totalInvokeRowsTime;
   private transient Counter totalInvokeRows;
   private static final String COUNTER_INVOKE_ROWS_COST_TIME = 
"totalInvokeRowsTimeNs";
   private static final String COUNTER_INVOKE_ROWS = "totalInvokeRows";
 
   public DorisSinkFunction(StreamingContext context) {
-    this.properties = context.parameter().getProperties();
+    Properties properties = context.parameter().getProperties();
     this.dorisConfig = new DorisConfig(properties);
     this.dorisSinkWriter = new DorisSinkWriter(dorisConfig);
   }
@@ -83,8 +83,8 @@ public class DorisSinkFunction<T> extends RichSinkFunction<T> 
implements Checkpo
           || null == data.getDataRows()) {
         LOGGER.warn(
             String.format(
-                " row data not fullfilled. {database: %s, table: %s, dataRows: 
%s}",
-                data.getDatabase(), data.getTable(), data.getDataRows()));
+                " row data not fulfilled. {database: %s, table: %s, dataRows: 
%s}",
+                data.getDatabase(), data.getTable(), 
Arrays.toString(data.getDataRows())));
         return;
       }
       dorisSinkWriter.writeRecords(data.getDatabase(), data.getTable(), 
data.getDataRows());
@@ -110,7 +110,7 @@ public class DorisSinkFunction<T> extends 
RichSinkFunction<T> implements Checkpo
   public void snapshotState(FunctionSnapshotContext context) throws Exception {
     if (Semantic.EXACTLY_ONCE.equals(Semantic.of(dorisConfig.semantic()))) {
       // save state
-      checkpointedState.add(dorisSinkWriter.getBufferedBatchMap());
+      checkpointState.add(dorisSinkWriter.getBufferedBatchMap());
       flushPreviousState();
     }
   }
@@ -122,16 +122,16 @@ public class DorisSinkFunction<T> extends 
RichSinkFunction<T> implements Checkpo
           new ListStateDescriptor<>(
               "buffered-rows",
               TypeInformation.of(new TypeHint<Map<String, 
DorisSinkBufferEntry>>() {}));
-      checkpointedState = 
context.getOperatorStateStore().getListState(descriptor);
+      checkpointState = 
context.getOperatorStateStore().getListState(descriptor);
     }
   }
 
   private void flushPreviousState() throws Exception {
     // flush the batch saved at the previous checkpoint
-    for (Map<String, DorisSinkBufferEntry> state : checkpointedState.get()) {
+    for (Map<String, DorisSinkBufferEntry> state : checkpointState.get()) {
       dorisSinkWriter.setBufferedBatchMap(state);
       dorisSinkWriter.flush(null, true);
     }
-    checkpointedState.clear();
+    checkpointState.clear();
   }
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
index 160b32047..aa063bf1b 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
@@ -59,7 +59,6 @@ public class DorisSinkWriter implements Serializable {
   private transient Counter totalFlushFailedTimes;
 
   private final Map<String, DorisSinkBufferEntry> bufferMap = new 
ConcurrentHashMap<>();
-  private final Long timeout = 3000L;
   private volatile boolean closed = false;
   private volatile boolean flushThreadAlive = false;
   private volatile Throwable flushException;
@@ -239,7 +238,8 @@ public class DorisSinkWriter implements Serializable {
   }
 
   private boolean asyncFlush() throws Exception {
-    final DorisSinkBufferEntry flushData = flushQueue.poll(timeout, 
TimeUnit.MILLISECONDS);
+    long timeOut = 3000L;
+    final DorisSinkBufferEntry flushData = flushQueue.poll(timeOut, 
TimeUnit.MILLISECONDS);
     if (flushData == null || flushData.getBatchCount() == 0) {
       return true;
     }
@@ -311,13 +311,13 @@ public class DorisSinkWriter implements Serializable {
   private void checkFlushException() {
     if (flushException != null) {
       StackTraceElement[] stack = Thread.currentThread().getStackTrace();
-      for (int i = 0; i < stack.length; i++) {
+      for (StackTraceElement stackTraceElement : stack) {
         LOG.info(
-            stack[i].getClassName()
+            stackTraceElement.getClassName()
                 + "."
-                + stack[i].getMethodName()
+                + stackTraceElement.getMethodName()
                 + " line:"
-                + stack[i].getLineNumber());
+                + stackTraceElement.getLineNumber());
       }
       throw new RuntimeException("Writing records to doris failed.", 
flushException);
     }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/util/DorisDelimiterParser.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/util/DorisDelimiterParser.java
index 1c804b7ae..50e9e4a66 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/util/DorisDelimiterParser.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/util/DorisDelimiterParser.java
@@ -24,12 +24,22 @@ public class DorisDelimiterParser {
   private static final String HEX_STRING = "0123456789ABCDEF";
 
   public static String parse(String sp) throws RuntimeException {
-    if (sp == null || sp.length() == 0) {
+    if (sp == null || sp.isEmpty()) {
       throw new RuntimeException("Delimiter can't be empty");
     }
     if (!sp.toUpperCase().startsWith("\\X")) {
       return sp;
     }
+    String hexStr = getString(sp);
+    // transform to separator
+    StringWriter writer = new StringWriter();
+    for (byte b : hexStrToBytes(hexStr)) {
+      writer.append((char) b);
+    }
+    return writer.toString();
+  }
+
+  private static String getString(String sp) {
     String hexStr = sp.substring(2);
     // check hex str
     if (hexStr.isEmpty()) {
@@ -43,12 +53,7 @@ public class DorisDelimiterParser {
         throw new RuntimeException("Failed to parse delimiter: `Hex str format 
error`");
       }
     }
-    // transform to separator
-    StringWriter writer = new StringWriter();
-    for (byte b : hexStrToBytes(hexStr)) {
-      writer.append((char) b);
-    }
-    return writer.toString();
+    return hexStr;
   }
 
   private static byte[] hexStrToBytes(String hexStr) {

Reply via email to