EricJoy2048 commented on code in PR #3850:
URL:
https://github.com/apache/incubator-seatunnel/pull/3850#discussion_r1104088450
##########
seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/client/DorisSinkManager.java:
##########
@@ -76,10 +90,64 @@ private void tryInit() throws IOException {
}, batchIntervalMs, batchIntervalMs, TimeUnit.MILLISECONDS);
}
+ public void startAsyncFlushing() {
+ // start flush thread
+ Thread flushThread = new Thread(() -> {
+ while (true) {
+ try {
+ if (!asyncFlush()) {
+ log.info("doris flush thread is about to exit.");
+ asyncFlushThreadAlive = false;
+ break;
+ }
+ } catch (Exception e) {
+ flushException = e;
+ }
+ }
+ });
+
+ flushThread.setUncaughtExceptionHandler((t, e) -> {
+ log.error("doris flush thread uncaught exception occurred: " +
e.getMessage(), e);
+ flushException = (Exception) e;
+ asyncFlushThreadAlive = false;
+ });
+ flushThread.setName("doris-flush");
+ flushThread.setDaemon(true);
+ flushThread.start();
+ asyncFlushThreadAlive = true;
+ }
+
+ private boolean asyncFlush() throws Exception {
+ DorisFlushTuple flushData = flushQueue.poll(FLUSH_QUEUE_POLL_TIMEOUT,
TimeUnit.MILLISECONDS);
+
+ if (flushData == null || (0 == flushData.getBytes() &&
!flushData.isEof())) {
+ return true;
+ }
+ if (flushData.isEof()) {
+ return false;
+ }
+ flush(flushData);
+ log.info(String.format("Async stream load finished: label[%s].",
flushData.getLabel()));
+ //remove finished flushData from previousBufferMap
+ previousBufferMap.remove(flushData.getLabel());
+
+ return true;
+ }
+
public synchronized void write(String record) throws IOException {
- tryInit();
checkFlushException();
byte[] bts = record.getBytes(StandardCharsets.UTF_8);
+
+ if (sinkSemantics.equals(DorisSinkSemantics.AT_LEAST_ONCE)) {
+ if (currentSinkBuffer == null) {
+ String newLabel = createBatchLabel();
+ currentSinkBuffer = new DorisFlushTuple(newLabel);
+ }
+ currentSinkBuffer.addToBuffer(bts);
Review Comment:
If we use `JDBC Source`, It may oom here. I don't think saving data in
memory is a good way, I think we should discuss whether to add state saving
like rocksdb to save state information.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]