This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
new dbc31b8c6 [Improve] doris datastream-connector code style improvement
dbc31b8c6 is described below
commit dbc31b8c6c223121834aace36d1098427674566c
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 26 18:26:09 2025 +0800
[Improve] doris datastream-connector code style improvement
---
.../connector/doris/internal/DorisSinkFunction.java | 18 +++++++++---------
.../connector/doris/internal/DorisSinkWriter.java | 12 ++++++------
.../connector/doris/util/DorisDelimiterParser.java | 19 ++++++++++++-------
3 files changed, 27 insertions(+), 22 deletions(-)
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
index 5018d2451..0d273d65e 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java
@@ -39,6 +39,7 @@ import
org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
@@ -46,18 +47,17 @@ import java.util.Properties;
public class DorisSinkFunction<T> extends RichSinkFunction<T> implements
CheckpointedFunction {
private static final Logger LOGGER =
LoggerFactory.getLogger(DorisSinkFunction.class);
- private final Properties properties;
private final DorisSinkWriter dorisSinkWriter;
private final DorisConfig dorisConfig;
// state only works with `EXACTLY_ONCE`
- private transient ListState<Map<String, DorisSinkBufferEntry>>
checkpointedState;
+ private transient ListState<Map<String, DorisSinkBufferEntry>>
checkpointState;
private transient Counter totalInvokeRowsTime;
private transient Counter totalInvokeRows;
private static final String COUNTER_INVOKE_ROWS_COST_TIME =
"totalInvokeRowsTimeNs";
private static final String COUNTER_INVOKE_ROWS = "totalInvokeRows";
public DorisSinkFunction(StreamingContext context) {
- this.properties = context.parameter().getProperties();
+ Properties properties = context.parameter().getProperties();
this.dorisConfig = new DorisConfig(properties);
this.dorisSinkWriter = new DorisSinkWriter(dorisConfig);
}
@@ -83,8 +83,8 @@ public class DorisSinkFunction<T> extends RichSinkFunction<T>
implements Checkpo
|| null == data.getDataRows()) {
LOGGER.warn(
String.format(
- " row data not fullfilled. {database: %s, table: %s, dataRows:
%s}",
- data.getDatabase(), data.getTable(), data.getDataRows()));
+ " row data not fulfilled. {database: %s, table: %s, dataRows:
%s}",
+ data.getDatabase(), data.getTable(),
Arrays.toString(data.getDataRows())));
return;
}
dorisSinkWriter.writeRecords(data.getDatabase(), data.getTable(),
data.getDataRows());
@@ -110,7 +110,7 @@ public class DorisSinkFunction<T> extends
RichSinkFunction<T> implements Checkpo
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (Semantic.EXACTLY_ONCE.equals(Semantic.of(dorisConfig.semantic()))) {
// save state
- checkpointedState.add(dorisSinkWriter.getBufferedBatchMap());
+ checkpointState.add(dorisSinkWriter.getBufferedBatchMap());
flushPreviousState();
}
}
@@ -122,16 +122,16 @@ public class DorisSinkFunction<T> extends
RichSinkFunction<T> implements Checkpo
new ListStateDescriptor<>(
"buffered-rows",
TypeInformation.of(new TypeHint<Map<String,
DorisSinkBufferEntry>>() {}));
- checkpointedState =
context.getOperatorStateStore().getListState(descriptor);
+ checkpointState =
context.getOperatorStateStore().getListState(descriptor);
}
}
private void flushPreviousState() throws Exception {
// flush the batch saved at the previous checkpoint
- for (Map<String, DorisSinkBufferEntry> state : checkpointedState.get()) {
+ for (Map<String, DorisSinkBufferEntry> state : checkpointState.get()) {
dorisSinkWriter.setBufferedBatchMap(state);
dorisSinkWriter.flush(null, true);
}
- checkpointedState.clear();
+ checkpointState.clear();
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
index 160b32047..aa063bf1b 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java
@@ -59,7 +59,6 @@ public class DorisSinkWriter implements Serializable {
private transient Counter totalFlushFailedTimes;
private final Map<String, DorisSinkBufferEntry> bufferMap = new
ConcurrentHashMap<>();
- private final Long timeout = 3000L;
private volatile boolean closed = false;
private volatile boolean flushThreadAlive = false;
private volatile Throwable flushException;
@@ -239,7 +238,8 @@ public class DorisSinkWriter implements Serializable {
}
private boolean asyncFlush() throws Exception {
- final DorisSinkBufferEntry flushData = flushQueue.poll(timeout,
TimeUnit.MILLISECONDS);
+ long timeOut = 3000L;
+ final DorisSinkBufferEntry flushData = flushQueue.poll(timeOut,
TimeUnit.MILLISECONDS);
if (flushData == null || flushData.getBatchCount() == 0) {
return true;
}
@@ -311,13 +311,13 @@ public class DorisSinkWriter implements Serializable {
private void checkFlushException() {
if (flushException != null) {
StackTraceElement[] stack = Thread.currentThread().getStackTrace();
- for (int i = 0; i < stack.length; i++) {
+ for (StackTraceElement stackTraceElement : stack) {
LOG.info(
- stack[i].getClassName()
+ stackTraceElement.getClassName()
+ "."
- + stack[i].getMethodName()
+ + stackTraceElement.getMethodName()
+ " line:"
- + stack[i].getLineNumber());
+ + stackTraceElement.getLineNumber());
}
throw new RuntimeException("Writing records to doris failed.",
flushException);
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/util/DorisDelimiterParser.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/util/DorisDelimiterParser.java
index 1c804b7ae..50e9e4a66 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/util/DorisDelimiterParser.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/util/DorisDelimiterParser.java
@@ -24,12 +24,22 @@ public class DorisDelimiterParser {
private static final String HEX_STRING = "0123456789ABCDEF";
public static String parse(String sp) throws RuntimeException {
- if (sp == null || sp.length() == 0) {
+ if (sp == null || sp.isEmpty()) {
throw new RuntimeException("Delimiter can't be empty");
}
if (!sp.toUpperCase().startsWith("\\X")) {
return sp;
}
+ String hexStr = getString(sp);
+ // transform to separator
+ StringWriter writer = new StringWriter();
+ for (byte b : hexStrToBytes(hexStr)) {
+ writer.append((char) b);
+ }
+ return writer.toString();
+ }
+
+ private static String getString(String sp) {
String hexStr = sp.substring(2);
// check hex str
if (hexStr.isEmpty()) {
@@ -43,12 +53,7 @@ public class DorisDelimiterParser {
throw new RuntimeException("Failed to parse delimiter: `Hex str format
error`");
}
}
- // transform to separator
- StringWriter writer = new StringWriter();
- for (byte b : hexStrToBytes(hexStr)) {
- writer.append((char) b);
- }
- return writer.toString();
+ return hexStr;
}
private static byte[] hexStrToBytes(String hexStr) {