This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 411020ab [Improve] Fix the problem caused by very small interval in
batch mode (#462)
411020ab is described below
commit 411020ab8e945613ce316dfb895254319d96fbdc
Author: wudi <[email protected]>
AuthorDate: Thu Aug 15 14:28:08 2024 +0800
[Improve] Fix the problem caused by very small interval in batch mode (#462)
---
.../doris/flink/cfg/DorisExecutionOptions.java | 20 +-
.../flink/sink/batch/BatchBufferHttpEntity.java | 76 ++++++++
.../doris/flink/sink/batch/BatchBufferStream.java | 73 ++++++++
.../doris/flink/sink/batch/BatchRecordBuffer.java | 97 ++++------
.../flink/sink/batch/DorisBatchStreamLoad.java | 208 ++++++++++++++++++---
.../doris/flink/sink/batch/DorisBatchWriter.java | 8 +-
.../sink/{batch => copy}/BatchRecordBuffer.java | 2 +-
.../doris/flink/sink/copy/BatchStageLoad.java | 1 -
.../doris/flink/table/DorisConfigOptions.java | 4 +-
.../doris/flink/cfg/DorisExecutionOptionsTest.java | 16 +-
.../apache/doris/flink/sink/DorisSinkITCase.java | 21 ++-
.../sink/batch/TestBatchBufferHttpEntity.java | 46 +++++
.../flink/sink/batch/TestBatchBufferStream.java | 95 ++++++++++
.../flink/sink/batch/TestDorisBatchStreamLoad.java | 80 +++++++-
.../{batch => copy}/TestBatchRecordBuffer.java | 2 +-
.../flink/table/DorisDynamicTableFactoryTest.java | 4 +-
16 files changed, 631 insertions(+), 122 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 8f3cc240..7ad8ba97 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -40,8 +40,8 @@ public class DorisExecutionOptions implements Serializable {
private static final int DEFAULT_BUFFER_COUNT = 3;
// batch flush
private static final int DEFAULT_FLUSH_QUEUE_SIZE = 2;
- private static final int DEFAULT_BUFFER_FLUSH_MAX_ROWS = 50000;
- private static final int DEFAULT_BUFFER_FLUSH_MAX_BYTES = 10 * 1024 * 1024;
+ private static final int DEFAULT_BUFFER_FLUSH_MAX_ROWS = 500000;
+ private static final int DEFAULT_BUFFER_FLUSH_MAX_BYTES = 100 * 1024 *
1024;
private static final long DEFAULT_BUFFER_FLUSH_INTERVAL_MS = 10 * 1000;
private final int checkInterval;
private final int maxRetries;
@@ -358,9 +358,6 @@ public class DorisExecutionOptions implements Serializable {
}
public Builder setBufferFlushIntervalMs(long bufferFlushIntervalMs) {
- Preconditions.checkState(
- bufferFlushIntervalMs >= 1000,
- "bufferFlushIntervalMs must be greater than or equal to 1
second");
this.bufferFlushIntervalMs = bufferFlushIntervalMs;
return this;
}
@@ -397,6 +394,19 @@ public class DorisExecutionOptions implements Serializable
{
&& JSON.equals(streamLoadProp.getProperty(FORMAT_KEY))) {
streamLoadProp.put(READ_JSON_BY_LINE, true);
}
+
+ Preconditions.checkArgument(
+ bufferFlushIntervalMs >= 1000,
+ "bufferFlushIntervalMs must be greater than or equal to 1
second");
+
+ Preconditions.checkArgument(
+ bufferFlushMaxRows >= 10000,
+ "bufferFlushMaxRows must be greater than or equal to
10000");
+
+ Preconditions.checkArgument(
+ bufferFlushMaxBytes >= 10485760,
+ "bufferFlushMaxBytes must be greater than or equal to
10485760(10MB)");
+
return new DorisExecutionOptions(
checkInterval,
maxRetries,
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java
new file mode 100644
index 00000000..3c0068eb
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferHttpEntity.java
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import org.apache.http.entity.AbstractHttpEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.List;
+
+public class BatchBufferHttpEntity extends AbstractHttpEntity {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BatchBufferHttpEntity.class);
+ protected static final int OUTPUT_BUFFER_SIZE = 4096;
+ private final List<byte[]> buffer;
+ private final long contentLength;
+
+ public BatchBufferHttpEntity(BatchRecordBuffer recordBuffer) {
+ this.buffer = recordBuffer.getBuffer();
+ this.contentLength = recordBuffer.getBufferSizeBytes();
+ }
+
+ @Override
+ public boolean isRepeatable() {
+ return true;
+ }
+
+ @Override
+ public boolean isChunked() {
+ return false;
+ }
+
+ @Override
+ public long getContentLength() {
+ return contentLength;
+ }
+
+ @Override
+ public InputStream getContent() {
+ return new BatchBufferStream(buffer);
+ }
+
+ @Override
+ public void writeTo(OutputStream outStream) throws IOException {
+ try (InputStream inStream = new BatchBufferStream(buffer)) {
+ final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE];
+ int readLen;
+ while ((readLen = inStream.read(buffer)) != -1) {
+ outStream.write(buffer, 0, readLen);
+ }
+ }
+ }
+
+ @Override
+ public boolean isStreaming() {
+ return false;
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java
new file mode 100644
index 00000000..a782bb53
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchBufferStream.java
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+
+public class BatchBufferStream extends InputStream {
+ private final Iterator<byte[]> iterator;
+ private byte[] currentRow;
+ private int currentPos;
+
+ public BatchBufferStream(List<byte[]> buffer) {
+ this.iterator = buffer.iterator();
+ }
+
+ @Override
+ public int read() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int read(byte[] buf) throws IOException {
+ return read(buf, 0, buf.length);
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws IOException {
+ if (!iterator.hasNext() && currentRow == null) {
+ return -1;
+ }
+
+ byte[] item = currentRow;
+ int pos = currentPos;
+ int readBytes = 0;
+ while (readBytes < len && (item != null || iterator.hasNext())) {
+ if (item == null) {
+ item = iterator.next();
+ pos = 0;
+ }
+
+ int size = Math.min(len - readBytes, item.length - pos);
+ System.arraycopy(item, pos, buf, off + readBytes, size);
+ readBytes += size;
+ pos += size;
+
+ if (pos == item.length) {
+ item = null;
+ pos = 0;
+ }
+ }
+ currentRow = item;
+ currentPos = pos;
+ return readBytes;
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
index df40e7a9..8eb98037 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
@@ -17,80 +17,52 @@
package org.apache.doris.flink.sink.batch;
-import org.apache.flink.annotation.VisibleForTesting;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
+import java.util.LinkedList;
/** buffer to queue. */
public class BatchRecordBuffer {
private static final Logger LOG =
LoggerFactory.getLogger(BatchRecordBuffer.class);
public static final String LINE_SEPARATOR = "\n";
private String labelName;
- private ByteBuffer buffer;
+ private LinkedList<byte[]> buffer;
private byte[] lineDelimiter;
private int numOfRecords = 0;
- private int bufferSizeBytes = 0;
+ private long bufferSizeBytes = 0;
private boolean loadBatchFirstRecord = true;
private String database;
private String table;
+ private final long createTime = System.currentTimeMillis();
+ private long retainTime = 0;
- public BatchRecordBuffer() {}
-
- public BatchRecordBuffer(byte[] lineDelimiter, int bufferSize) {
- super();
- this.lineDelimiter = lineDelimiter;
- this.buffer = ByteBuffer.allocate(bufferSize);
+ public BatchRecordBuffer() {
+ this.buffer = new LinkedList<>();
}
- public BatchRecordBuffer(String database, String table, byte[]
lineDelimiter, int bufferSize) {
+ public BatchRecordBuffer(String database, String table, byte[]
lineDelimiter, long retainTime) {
super();
this.database = database;
this.table = table;
this.lineDelimiter = lineDelimiter;
- this.buffer = ByteBuffer.allocate(bufferSize);
+ this.buffer = new LinkedList<>();
+ this.retainTime = retainTime;
}
- public void insert(byte[] record) {
- ensureCapacity(record.length);
+ public int insert(byte[] record) {
+ int recordSize = record.length;
if (loadBatchFirstRecord) {
loadBatchFirstRecord = false;
} else if (lineDelimiter != null) {
- this.buffer.put(this.lineDelimiter);
+ this.buffer.add(this.lineDelimiter);
+ setBufferSizeBytes(this.bufferSizeBytes +
this.lineDelimiter.length);
+ recordSize += this.lineDelimiter.length;
}
- this.buffer.put(record);
- setNumOfRecords(getNumOfRecords() + 1);
- setBufferSizeBytes(getBufferSizeBytes() + record.length);
- }
-
- @VisibleForTesting
- public void ensureCapacity(int length) {
- int lineDelimiterSize = this.lineDelimiter == null ? 0 :
this.lineDelimiter.length;
- if (buffer.remaining() - lineDelimiterSize >= length) {
- return;
- }
- int currentRemain = buffer.remaining();
- int currentCapacity = buffer.capacity();
- // add lineDelimiter length
- int needed = length - buffer.remaining() + lineDelimiterSize;
- // grow at least 1MB
- long grow = Math.max(needed, 1024 * 1024);
- // grow at least 50% of the current size
- grow = Math.max(buffer.capacity() / 2, grow);
- int newCapacity = (int) Math.min(Integer.MAX_VALUE, buffer.capacity()
+ grow);
- ByteBuffer tmp = ByteBuffer.allocate(newCapacity);
- buffer.flip();
- tmp.put(buffer);
- buffer.clear();
- buffer = tmp;
- LOG.info(
- "record length {},buffer remain {} ,grow capacity {} to {}",
- length,
- currentRemain,
- currentCapacity,
- newCapacity);
+ this.buffer.add(record);
+ setNumOfRecords(this.numOfRecords + 1);
+ setBufferSizeBytes(this.bufferSizeBytes + record.length);
+ return recordSize;
}
public String getLabelName() {
@@ -106,13 +78,6 @@ public class BatchRecordBuffer {
return numOfRecords == 0;
}
- public ByteBuffer getData() {
- // change mode
- buffer.flip();
- LOG.debug("flush buffer: {} records, {} bytes", getNumOfRecords(),
getBufferSizeBytes());
- return buffer;
- }
-
public void clear() {
this.buffer.clear();
this.numOfRecords = 0;
@@ -121,7 +86,7 @@ public class BatchRecordBuffer {
this.loadBatchFirstRecord = true;
}
- public ByteBuffer getBuffer() {
+ public LinkedList<byte[]> getBuffer() {
return buffer;
}
@@ -131,7 +96,7 @@ public class BatchRecordBuffer {
}
/** @return Buffer size in bytes */
- public int getBufferSizeBytes() {
+ public long getBufferSizeBytes() {
return bufferSizeBytes;
}
@@ -141,7 +106,7 @@ public class BatchRecordBuffer {
}
/** @param bufferSizeBytes Updates sum of size of records present in this
buffer (Bytes) */
- public void setBufferSizeBytes(int bufferSizeBytes) {
+ public void setBufferSizeBytes(long bufferSizeBytes) {
this.bufferSizeBytes = bufferSizeBytes;
}
@@ -160,4 +125,22 @@ public class BatchRecordBuffer {
public void setTable(String table) {
this.table = table;
}
+
+ public String getTableIdentifier() {
+ if (database != null && table != null) {
+ return database + "." + table;
+ }
+ return null;
+ }
+
+ public byte[] getLineDelimiter() {
+ return lineDelimiter;
+ }
+
+ public boolean shouldFlush() {
+ // When the buffer create time is later than the first interval
trigger,
+ // the write will not be triggered in the next interval,
+ // so multiply it by 1.5 to trigger it as early as possible.
+ return (System.currentTimeMillis() - createTime) * 1.5 > retainTime;
+ }
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 2dd7a50e..3240dafe 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -36,7 +36,6 @@ import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.http.client.entity.GzipCompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
@@ -45,7 +44,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -61,7 +59,11 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
@@ -82,6 +84,8 @@ public class DorisBatchStreamLoad implements Serializable {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final List<String> DORIS_SUCCESS_STATUS =
new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
+ private static final long STREAM_LOAD_MAX_BYTES = 10 * 1024 * 1024 *
1024L; // 10 GB
+ private static final long STREAM_LOAD_MAX_ROWS = Integer.MAX_VALUE;
private final LabelGenerator labelGenerator;
private final byte[] lineDelimiter;
private static final String LOAD_URL_PATTERN =
"http://%s/api/%s/%s/_stream_load";
@@ -103,6 +107,10 @@ public class DorisBatchStreamLoad implements Serializable {
private boolean enableGroupCommit;
private boolean enableGzCompress;
private int subTaskId;
+ private long maxBlockedBytes;
+ private final AtomicLong currentCacheBytes = new AtomicLong(0L);
+ private final Lock lock = new ReentrantLock();
+ private final Condition block = lock.newCondition();
public DorisBatchStreamLoad(
DorisOptions dorisOptions,
@@ -137,6 +145,10 @@ public class DorisBatchStreamLoad implements Serializable {
this.enableGzCompress = loadProps.getProperty(COMPRESS_TYPE,
"").equals(COMPRESS_TYPE_GZ);
this.executionOptions = executionOptions;
this.flushQueue = new
LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
+ // maxBlockedBytes ensures that a buffer can be written even if the
queue is full
+ this.maxBlockedBytes =
+ (long) executionOptions.getBufferFlushMaxBytes()
+ * (executionOptions.getFlushQueueSize() + 1);
if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
String[] tableInfo =
dorisOptions.getTableIdentifier().split("\\.");
Preconditions.checkState(
@@ -144,7 +156,7 @@ public class DorisBatchStreamLoad implements Serializable {
"tableIdentifier input error, the format is
database.table");
this.loadUrl = String.format(LOAD_URL_PATTERN, hostPort,
tableInfo[0], tableInfo[1]);
}
- this.loadAsyncExecutor = new LoadAsyncExecutor();
+ this.loadAsyncExecutor = new
LoadAsyncExecutor(executionOptions.getFlushQueueSize());
this.loadExecutorService =
new ThreadPoolExecutor(
1,
@@ -165,10 +177,10 @@ public class DorisBatchStreamLoad implements Serializable
{
* @param record
* @throws IOException
*/
- public synchronized void writeRecord(String database, String table, byte[]
record)
- throws InterruptedException {
+ public synchronized void writeRecord(String database, String table, byte[]
record) {
checkFlushException();
String bufferKey = getTableIdentifier(database, table);
+
BatchRecordBuffer buffer =
bufferMap.computeIfAbsent(
bufferKey,
@@ -177,30 +189,91 @@ public class DorisBatchStreamLoad implements Serializable
{
database,
table,
this.lineDelimiter,
-
executionOptions.getBufferFlushMaxBytes()));
- buffer.insert(record);
- // When it exceeds 80% of the byteSize,to flush, to avoid triggering
bytebuffer expansion
- if (buffer.getBufferSizeBytes() >=
executionOptions.getBufferFlushMaxBytes() * 0.8
- || (executionOptions.getBufferFlushMaxRows() != 0
- && buffer.getNumOfRecords() >=
executionOptions.getBufferFlushMaxRows())) {
- flush(bufferKey, false);
+
executionOptions.getBufferFlushIntervalMs()));
+
+ int bytes = buffer.insert(record);
+ currentCacheBytes.addAndGet(bytes);
+ if (currentCacheBytes.get() > maxBlockedBytes) {
+ lock.lock();
+ try {
+ while (currentCacheBytes.get() >= maxBlockedBytes) {
+ LOG.info(
+ "Cache full, waiting for flush, currentBytes: {},
maxBlockedBytes: {}",
+ currentCacheBytes.get(),
+ maxBlockedBytes);
+ block.await(1, TimeUnit.SECONDS);
+ }
+ } catch (InterruptedException e) {
+ this.exception.set(e);
+ throw new RuntimeException(e);
+ } finally {
+ lock.unlock();
+ }
}
+
+ // queue has space, flush according to the bufferMaxRows/bufferMaxBytes
+ if (flushQueue.size() < executionOptions.getFlushQueueSize()
+ && (buffer.getBufferSizeBytes() >=
executionOptions.getBufferFlushMaxBytes()
+ || buffer.getNumOfRecords() >=
executionOptions.getBufferFlushMaxRows())) {
+ boolean flush = bufferFullFlush(bufferKey);
+ LOG.info("trigger flush by buffer full, flush: {}", flush);
+
+ } else if (buffer.getBufferSizeBytes() >= STREAM_LOAD_MAX_BYTES
+ || buffer.getNumOfRecords() >= STREAM_LOAD_MAX_ROWS) {
+ // The buffer capacity exceeds the stream load limit, flush
+ boolean flush = bufferFullFlush(bufferKey);
+ LOG.info("trigger flush by buffer exceeding the limit, flush: {}",
flush);
+ }
+ }
+
+ public synchronized boolean bufferFullFlush(String bufferKey) {
+ return doFlush(bufferKey, false, true);
+ }
+
+ public synchronized boolean intervalFlush() {
+ return doFlush(null, false, false);
}
- public synchronized void flush(String bufferKey, boolean waitUtilDone)
- throws InterruptedException {
+ public synchronized boolean checkpointFlush() {
+ return doFlush(null, true, false);
+ }
+
+ private synchronized boolean doFlush(
+ String bufferKey, boolean waitUtilDone, boolean bufferFull) {
checkFlushException();
+ if (waitUtilDone || bufferFull) {
+ boolean flush = flush(bufferKey, waitUtilDone);
+ return flush;
+ } else if (flushQueue.size() < executionOptions.getFlushQueueSize()) {
+ boolean flush = flush(bufferKey, false);
+ return flush;
+ }
+ return false;
+ }
+
+ private synchronized boolean flush(String bufferKey, boolean waitUtilDone)
{
if (null == bufferKey) {
+ boolean flush = false;
for (String key : bufferMap.keySet()) {
- flushBuffer(key);
+ BatchRecordBuffer buffer = bufferMap.get(key);
+ if (waitUtilDone || buffer.shouldFlush()) {
+ // Ensure that the interval satisfies intervalMS
+ flushBuffer(key);
+ flush = true;
+ }
+ }
+ if (!waitUtilDone && !flush) {
+ return false;
}
} else if (bufferMap.containsKey(bufferKey)) {
flushBuffer(bufferKey);
+ } else {
+ throw new DorisBatchLoadException("buffer not found for key: " +
bufferKey);
}
-
if (waitUtilDone) {
waitAsyncLoadFinish();
}
+ return true;
}
private synchronized void flushBuffer(String bufferKey) {
@@ -247,20 +320,96 @@ public class DorisBatchStreamLoad implements Serializable
{
this.flushQueue.clear();
}
+ @VisibleForTesting
+ public boolean mergeBuffer(List<BatchRecordBuffer> recordList,
BatchRecordBuffer buffer) {
+ boolean merge = false;
+ if (recordList.size() > 1) {
+ boolean sameTable =
+ recordList.stream()
+ .map(BatchRecordBuffer::getTableIdentifier)
+ .distinct()
+ .count()
+ == 1;
+ // Buffers can be merged only if they belong to the same table.
+ if (sameTable) {
+ for (BatchRecordBuffer recordBuffer : recordList) {
+ if (recordBuffer != null
+ && recordBuffer.getLabelName() != null
+ &&
!buffer.getLabelName().equals(recordBuffer.getLabelName())
+ && !recordBuffer.getBuffer().isEmpty()) {
+ merge(buffer, recordBuffer);
+ merge = true;
+ }
+ }
+ LOG.info(
+ "merge {} buffer to one stream load, result
bufferBytes {}",
+ recordList.size(),
+ buffer.getBufferSizeBytes());
+ }
+ }
+ return merge;
+ }
+
+ private boolean merge(BatchRecordBuffer mergeBuffer, BatchRecordBuffer
buffer) {
+ if (buffer.getBuffer().isEmpty()) {
+ return false;
+ }
+ if (!mergeBuffer.getBuffer().isEmpty()) {
+ mergeBuffer.getBuffer().add(mergeBuffer.getLineDelimiter());
+ mergeBuffer.setBufferSizeBytes(
+ mergeBuffer.getBufferSizeBytes() +
mergeBuffer.getLineDelimiter().length);
+ currentCacheBytes.addAndGet(buffer.getLineDelimiter().length);
+ }
+ mergeBuffer.getBuffer().addAll(buffer.getBuffer());
+ mergeBuffer.setNumOfRecords(mergeBuffer.getNumOfRecords() +
buffer.getNumOfRecords());
+ mergeBuffer.setBufferSizeBytes(
+ mergeBuffer.getBufferSizeBytes() +
buffer.getBufferSizeBytes());
+ return true;
+ }
+
class LoadAsyncExecutor implements Runnable {
+
+ private int flushQueueSize;
+
+ public LoadAsyncExecutor(int flushQueueSize) {
+ this.flushQueueSize = flushQueueSize;
+ }
+
@Override
public void run() {
LOG.info("LoadAsyncExecutor start");
loadThreadAlive = true;
+ List<BatchRecordBuffer> recordList = new
ArrayList<>(flushQueueSize);
while (started.get()) {
- BatchRecordBuffer buffer = null;
+ recordList.clear();
try {
- buffer = flushQueue.poll(2000L, TimeUnit.MILLISECONDS);
- if (buffer == null) {
+ BatchRecordBuffer buffer = flushQueue.poll(2000L,
TimeUnit.MILLISECONDS);
+ if (buffer == null || buffer.getLabelName() == null) {
+ // label is empty and does not need to load. It is the
flag of waitUtilDone
continue;
}
- if (buffer.getLabelName() != null) {
- load(buffer.getLabelName(), buffer);
+ recordList.add(buffer);
+ boolean merge = false;
+ if (!flushQueue.isEmpty()) {
+ flushQueue.drainTo(recordList, flushQueueSize - 1);
+ if (mergeBuffer(recordList, buffer)) {
+ load(buffer.getLabelName(), buffer);
+ merge = true;
+ }
+ }
+
+ if (!merge) {
+ for (BatchRecordBuffer bf : recordList) {
+ if (bf == null || bf.getLabelName() == null) {
+ continue;
+ }
+ load(bf.getLabelName(), bf);
+ }
+ }
+
+ if (flushQueue.size() < flushQueueSize) {
+ // Avoid waiting for 2 rounds of intervalMs
+ doFlush(null, false, false);
}
} catch (Exception e) {
LOG.error("worker running error", e);
@@ -280,9 +429,8 @@ public class DorisBatchStreamLoad implements Serializable {
label = null;
}
refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
- ByteBuffer data = buffer.getData();
- ByteArrayEntity entity =
- new ByteArrayEntity(data.array(), data.arrayOffset(),
data.limit());
+
+ BatchBufferHttpEntity entity = new BatchBufferHttpEntity(buffer);
HttpPutBuilder putBuilder = new HttpPutBuilder();
putBuilder
.setUrl(loadUrl)
@@ -321,6 +469,18 @@ public class DorisBatchStreamLoad implements Serializable {
respContent.getErrorURL());
throw new DorisBatchLoadException(errMsg);
} else {
+ long cacheByteBeforeFlush =
+
currentCacheBytes.getAndAdd(-respContent.getLoadBytes());
+ LOG.info(
+ "load success, cacheBeforeFlushBytes:
{}, currentCacheBytes : {}",
+ cacheByteBeforeFlush,
+ currentCacheBytes.get());
+ lock.lock();
+ try {
+ block.signal();
+ } finally {
+ lock.unlock();
+ }
return;
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
index 6fbde55d..db486bcb 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
@@ -107,9 +107,9 @@ public class DorisBatchWriter<IN>
private void intervalFlush() {
try {
- LOG.info("interval flush triggered.");
- batchStreamLoad.flush(null, false);
- } catch (InterruptedException e) {
+ boolean flush = batchStreamLoad.intervalFlush();
+ LOG.debug("interval flush trigger, flush: {}", flush);
+ } catch (Exception e) {
flushException = e;
}
}
@@ -125,7 +125,7 @@ public class DorisBatchWriter<IN>
checkFlushException();
writeOneDorisRecord(serializer.flush());
LOG.info("checkpoint flush triggered.");
- batchStreamLoad.flush(null, true);
+ batchStreamLoad.checkpointFlush();
}
@Override
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchRecordBuffer.java
similarity index 99%
copy from
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
copy to
flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchRecordBuffer.java
index df40e7a9..e5f4c4eb 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/BatchRecordBuffer.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchRecordBuffer.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.flink.sink.batch;
+package org.apache.doris.flink.sink.copy;
import org.apache.flink.annotation.VisibleForTesting;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
index be8adcb0..2c5ed5c2 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java
@@ -28,7 +28,6 @@ import
org.apache.doris.flink.exception.DorisBatchLoadException;
import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
-import org.apache.doris.flink.sink.batch.BatchRecordBuffer;
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index 02e59084..2a1c9b1a 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -275,14 +275,14 @@ public class DorisConfigOptions {
public static final ConfigOption<Integer> SINK_BUFFER_FLUSH_MAX_ROWS =
ConfigOptions.key("sink.buffer-flush.max-rows")
.intType()
- .defaultValue(50000)
+ .defaultValue(500000)
.withDescription(
"The maximum number of flush items in each batch,
the default is 5w");
public static final ConfigOption<MemorySize> SINK_BUFFER_FLUSH_MAX_BYTES =
ConfigOptions.key("sink.buffer-flush.max-bytes")
.memoryType()
- .defaultValue(MemorySize.parse("10mb"))
+ .defaultValue(MemorySize.parse("100mb"))
.withDescription(
"The maximum number of bytes flushed in each
batch, the default is 10MB");
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
index 9cc19716..bc19c572 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/cfg/DorisExecutionOptionsTest.java
@@ -52,9 +52,9 @@ public class DorisExecutionOptionsTest {
.setWriteMode(WriteMode.STREAM_LOAD)
.setLabelPrefix("doris")
.enable2PC()
- .setBufferFlushMaxBytes(10)
+ .setBufferFlushMaxBytes(10485760)
.setBufferFlushIntervalMs(10000)
- .setBufferFlushMaxRows(12)
+ .setBufferFlushMaxRows(10000)
.setCheckInterval(10)
.setIgnoreCommitError(true)
.setDeletable(true)
@@ -72,9 +72,9 @@ public class DorisExecutionOptionsTest {
.setWriteMode(WriteMode.STREAM_LOAD)
.setLabelPrefix("doris")
.enable2PC()
- .setBufferFlushMaxBytes(10)
+ .setBufferFlushMaxBytes(10485760)
.setBufferFlushIntervalMs(10000)
- .setBufferFlushMaxRows(12)
+ .setBufferFlushMaxRows(10000)
.setCheckInterval(10)
.setIgnoreCommitError(true)
.setDeletable(true)
@@ -111,17 +111,17 @@ public class DorisExecutionOptionsTest {
Assert.assertNotEquals(exceptOptions, builder.build());
builder.enable2PC();
- builder.setBufferFlushMaxBytes(11);
+ builder.setBufferFlushMaxBytes(104857601);
Assert.assertNotEquals(exceptOptions, builder.build());
- builder.setBufferFlushMaxBytes(10);
+ builder.setBufferFlushMaxBytes(10485760);
builder.setBufferFlushIntervalMs(100001);
Assert.assertNotEquals(exceptOptions, builder.build());
builder.setBufferFlushIntervalMs(10000);
- builder.setBufferFlushMaxRows(2);
+ builder.setBufferFlushMaxRows(10000);
Assert.assertNotEquals(exceptOptions, builder.build());
- builder.setBufferFlushMaxRows(12);
+ builder.setBufferFlushMaxRows(10000);
builder.setCheckInterval(11);
Assert.assertNotEquals(exceptOptions, builder.build());
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index aa3d00da..de0ef041 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -208,9 +208,9 @@ public class DorisSinkITCase extends DorisTestBase {
+ " 'sink.enable.batch-mode' = 'true',"
+ " 'sink.enable-delete' = 'true',"
+ " 'sink.flush.queue-size' = '2',"
- + " 'sink.buffer-flush.max-rows' = '1',"
- + " 'sink.buffer-flush.max-bytes' = '5',"
- + " 'sink.buffer-flush.interval' = '10s'"
+ + " 'sink.buffer-flush.max-rows' = '10000',"
+ + " 'sink.buffer-flush.max-bytes' = '10MB',"
+ + " 'sink.buffer-flush.interval' = '1s'"
+ ")",
getFenodes(),
DATABASE + "." + TABLE_CSV_BATCH_TBL,
@@ -219,7 +219,7 @@ public class DorisSinkITCase extends DorisTestBase {
tEnv.executeSql(sinkDDL);
tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all
SELECT 'flink',2");
- Thread.sleep(10000);
+ Thread.sleep(20000);
List<String> expected = Arrays.asList("doris,1", "flink,2");
String query =
String.format(
@@ -248,8 +248,9 @@ public class DorisSinkITCase extends DorisTestBase {
executionBuilder
.setLabelPrefix(UUID.randomUUID().toString())
.setStreamLoadProp(properties)
- .setBufferFlushMaxBytes(1)
- .setBufferFlushMaxRows(10);
+ .setBufferFlushMaxBytes(10485760)
+ .setBufferFlushMaxRows(10000)
+ .setBufferFlushIntervalMs(1000);
builder.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(new SimpleStringSerializer())
@@ -258,7 +259,7 @@ public class DorisSinkITCase extends DorisTestBase {
env.fromElements("doris,1", "flink,2").sinkTo(builder.build());
env.execute();
- Thread.sleep(10000);
+ Thread.sleep(20000);
List<String> expected = Arrays.asList("doris,1", "flink,2");
String query =
String.format(
@@ -295,9 +296,9 @@ public class DorisSinkITCase extends DorisTestBase {
+ " 'sink.enable.batch-mode' = 'true',"
+ " 'sink.enable-delete' = 'true',"
+ " 'sink.flush.queue-size' = '2',"
- + " 'sink.buffer-flush.max-rows' = '3',"
- + " 'sink.buffer-flush.max-bytes' = '5000',"
- + " 'sink.buffer-flush.interval' = '10s'"
+ + " 'sink.buffer-flush.max-rows' = '10000',"
+ + " 'sink.buffer-flush.max-bytes' = '10MB',"
+ + " 'sink.buffer-flush.interval' = '1s'"
+ ")",
getFenodes(),
DATABASE + "." + TABLE_GROUP_COMMIT,
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferHttpEntity.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferHttpEntity.java
new file mode 100644
index 00000000..fe20c544
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferHttpEntity.java
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestBatchBufferHttpEntity {
+
+ @Test
+ public void testWrite() throws Exception {
+ BatchRecordBuffer recordBuffer = TestBatchBufferStream.mockBuffer();
+ byte[] expectedData =
TestBatchBufferStream.mergeByteArrays(recordBuffer.getBuffer());
+ Assert.assertEquals(recordBuffer.getNumOfRecords(), 1000);
+
+ BatchBufferHttpEntity entity = new BatchBufferHttpEntity(recordBuffer);
+ assertTrue(entity.isRepeatable());
+ assertFalse(entity.isStreaming());
+ assertEquals(entity.getContentLength(), expectedData.length);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ entity.writeTo(outputStream);
+ assertArrayEquals(expectedData, outputStream.toByteArray());
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferStream.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferStream.java
new file mode 100644
index 00000000..3dad5b60
--- /dev/null
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchBufferStream.java
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.batch;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestBatchBufferStream {
+
+ @Test
+ public void testRead() throws Exception {
+ BatchRecordBuffer recordBuffer = mockBuffer();
+ byte[] expectedData = mergeByteArrays(recordBuffer.getBuffer());
+ Assert.assertEquals(recordBuffer.getNumOfRecords(), 1000);
+
+ byte[] actualData = new byte[(int) recordBuffer.getBufferSizeBytes()];
+ try (BatchBufferStream inputStream = new
BatchBufferStream(recordBuffer.getBuffer())) {
+ int len = inputStream.read(actualData, 0, actualData.length);
+ assertEquals(actualData.length, len);
+ assertArrayEquals(expectedData, actualData);
+ }
+ }
+
+ @Test
+ public void testReadBufLen() throws Exception {
+ BatchRecordBuffer recordBuffer = mockBuffer();
+ byte[] expectedData = mergeByteArrays(recordBuffer.getBuffer());
+ Assert.assertEquals(recordBuffer.getNumOfRecords(), 1000);
+
+ byte[] actualData = new byte[(int) recordBuffer.getBufferSizeBytes()];
+ try (BatchBufferStream inputStream = new
BatchBufferStream(recordBuffer.getBuffer())) {
+ int pos = 0;
+ while (pos < actualData.length) {
+ // mock random length
+ int maxLen = new Random().nextInt(actualData.length - pos) + 1;
+ int len = inputStream.read(actualData, pos, maxLen);
+ if (len == -1) {
+ break;
+ }
+ assertTrue(len > 0 && len <= maxLen);
+ pos += len;
+ }
+ assertEquals(actualData.length, pos);
+ assertArrayEquals(expectedData, actualData);
+ }
+ }
+
+ public static BatchRecordBuffer mockBuffer() {
+ BatchRecordBuffer recordBuffer = new BatchRecordBuffer();
+ for (int i = 0; i < 1000; i++) {
+ recordBuffer.insert((UUID.randomUUID() + "," + i).getBytes());
+ }
+ return recordBuffer;
+ }
+
+ public static byte[] mergeByteArrays(List<byte[]> listOfByteArrays) {
+ int totalLength = 0;
+ for (byte[] byteArray : listOfByteArrays) {
+ totalLength += byteArray.length;
+ }
+
+ byte[] mergedArray = new byte[totalLength];
+
+ int currentPosition = 0;
+ for (byte[] byteArray : listOfByteArrays) {
+ System.arraycopy(byteArray, 0, mergedArray, currentPosition,
byteArray.length);
+ currentPosition += byteArray.length;
+ }
+
+ return mergedArray;
+ }
+}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
index d52149d6..62d84c99 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java
@@ -41,9 +41,13 @@ import org.mockito.MockedStatic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.charset.StandardCharsets;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
+import static
org.apache.doris.flink.sink.batch.TestBatchBufferStream.mergeByteArrays;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
@@ -84,8 +88,10 @@ public class TestDorisBatchStreamLoad {
@Test
public void testLoadFail() throws Exception {
+ LOG.info("testLoadFail start");
DorisReadOptions readOptions = DorisReadOptions.builder().build();
- DorisExecutionOptions executionOptions =
DorisExecutionOptions.builder().build();
+ DorisExecutionOptions executionOptions =
+
DorisExecutionOptions.builder().setBufferFlushIntervalMs(1000).build();
DorisOptions options =
DorisOptions.builder()
.setFenodes("127.0.0.1:1")
@@ -104,7 +110,7 @@ public class TestDorisBatchStreamLoad {
() -> loader.isLoadThreadAlive(),
Deadline.fromNow(Duration.ofSeconds(10)),
100L,
- "Condition was not met in given timeout.");
+ "testLoadFail wait loader start failed.");
Assert.assertTrue(loader.isLoadThreadAlive());
BackendUtil backendUtil = mock(BackendUtil.class);
HttpClientBuilder httpClientBuilder = mock(HttpClientBuilder.class);
@@ -118,17 +124,25 @@ public class TestDorisBatchStreamLoad {
when(httpClientBuilder.build()).thenReturn(httpClient);
when(httpClient.execute(any())).thenReturn(response);
loader.writeRecord("db", "tbl", "1,data".getBytes());
- loader.flush("db.tbl", true);
+ loader.checkpointFlush();
+ TestUtil.waitUntilCondition(
+ () -> !loader.isLoadThreadAlive(),
+ Deadline.fromNow(Duration.ofSeconds(20)),
+ 100L,
+ "testLoadFail wait loader exit failed." +
loader.isLoadThreadAlive());
AtomicReference<Throwable> exception = loader.getException();
Assert.assertTrue(exception.get() instanceof Exception);
Assert.assertTrue(exception.get().getMessage().contains("stream load
error"));
+ LOG.info("testLoadFail end");
}
@Test
public void testLoadError() throws Exception {
+ LOG.info("testLoadError start");
DorisReadOptions readOptions = DorisReadOptions.builder().build();
- DorisExecutionOptions executionOptions =
DorisExecutionOptions.builder().build();
+ DorisExecutionOptions executionOptions =
+
DorisExecutionOptions.builder().setBufferFlushIntervalMs(1000).build();
DorisOptions options =
DorisOptions.builder()
.setFenodes("127.0.0.1:1")
@@ -148,7 +162,7 @@ public class TestDorisBatchStreamLoad {
() -> loader.isLoadThreadAlive(),
Deadline.fromNow(Duration.ofSeconds(10)),
100L,
- "Condition was not met in given timeout.");
+ "testLoadError wait loader start failed.");
Assert.assertTrue(loader.isLoadThreadAlive());
BackendUtil backendUtil = mock(BackendUtil.class);
HttpClientBuilder httpClientBuilder = mock(HttpClientBuilder.class);
@@ -161,12 +175,17 @@ public class TestDorisBatchStreamLoad {
when(httpClientBuilder.build()).thenReturn(httpClient);
when(httpClient.execute(any())).thenReturn(response);
loader.writeRecord("db", "tbl", "1,data".getBytes());
- loader.flush("db.tbl", true);
+ loader.checkpointFlush();
+ TestUtil.waitUntilCondition(
+ () -> !loader.isLoadThreadAlive(),
+ Deadline.fromNow(Duration.ofSeconds(20)),
+ 100L,
+ "testLoadError wait loader exit failed." +
loader.isLoadThreadAlive());
AtomicReference<Throwable> exception = loader.getException();
-
Assert.assertTrue(exception.get() instanceof Exception);
Assert.assertTrue(exception.get().getMessage().contains("stream load
error"));
+ LOG.info("testLoadError end");
}
@After
@@ -175,4 +194,51 @@ public class TestDorisBatchStreamLoad {
backendUtilMockedStatic.close();
}
}
+
+ @Test
+ public void mergeBufferTest() {
+ DorisReadOptions readOptions = DorisReadOptions.builder().build();
+ DorisExecutionOptions executionOptions =
DorisExecutionOptions.builder().build();
+ DorisOptions options =
+ DorisOptions.builder()
+ .setFenodes("127.0.0.1:8030")
+ .setBenodes("127.0.0.1:9030")
+ .setTableIdentifier("db.tbl")
+ .build();
+
+ DorisBatchStreamLoad loader =
+ new DorisBatchStreamLoad(
+ options, readOptions, executionOptions, new
LabelGenerator("xx", false), 0);
+
+ List<BatchRecordBuffer> bufferList = new ArrayList<>();
+ BatchRecordBuffer recordBuffer =
+ new BatchRecordBuffer("db", "tbl",
"\n".getBytes(StandardCharsets.UTF_8), 0);
+ recordBuffer.insert("doris,2".getBytes(StandardCharsets.UTF_8));
+ recordBuffer.setLabelName("label2");
+ BatchRecordBuffer buffer =
+ new BatchRecordBuffer("db", "tbl",
"\n".getBytes(StandardCharsets.UTF_8), 0);
+ buffer.insert("doris,1".getBytes(StandardCharsets.UTF_8));
+ buffer.setLabelName("label1");
+
+ boolean flag = loader.mergeBuffer(bufferList, buffer);
+ Assert.assertEquals(false, flag);
+
+ bufferList.add(buffer);
+ bufferList.add(recordBuffer);
+ flag = loader.mergeBuffer(bufferList, buffer);
+ Assert.assertEquals(true, flag);
+ byte[] bytes = mergeByteArrays(buffer.getBuffer());
+ Assert.assertArrayEquals(bytes,
"doris,1\ndoris,2".getBytes(StandardCharsets.UTF_8));
+
+ // multi table
+ bufferList.clear();
+ bufferList.add(buffer);
+ BatchRecordBuffer recordBuffer2 =
+ new BatchRecordBuffer("db", "tbl2",
"\n".getBytes(StandardCharsets.UTF_8), 0);
+ recordBuffer2.insert("doris,3".getBytes(StandardCharsets.UTF_8));
+ recordBuffer2.setLabelName("label3");
+ bufferList.add(recordBuffer2);
+ flag = loader.mergeBuffer(bufferList, buffer);
+ Assert.assertEquals(false, flag);
+ }
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestBatchRecordBuffer.java
similarity index 99%
rename from
flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
rename to
flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestBatchRecordBuffer.java
index 1a6897c9..3107225a 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/batch/TestBatchRecordBuffer.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestBatchRecordBuffer.java
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.flink.sink.batch;
+package org.apache.doris.flink.sink.copy;
import org.junit.Assert;
import org.junit.Test;
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
index 05a93dc5..56887d93 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/table/DorisDynamicTableFactoryTest.java
@@ -135,7 +135,7 @@ public class DorisDynamicTableFactoryTest {
properties.put("sink.use-cache", "true");
properties.put("sink.enable.batch-mode", "true");
properties.put("sink.flush.queue-size", "2");
- properties.put("sink.buffer-flush.max-rows", "1000");
+ properties.put("sink.buffer-flush.max-rows", "10000");
properties.put("sink.buffer-flush.max-bytes", "10MB");
properties.put("sink.buffer-flush.interval", "10s");
properties.put("sink.ignore.update-before", "true");
@@ -169,7 +169,7 @@ public class DorisDynamicTableFactoryTest {
.enable2PC()
.setBufferFlushIntervalMs(10000)
.setBufferFlushMaxBytes(10 * 1024 * 1024)
- .setBufferFlushMaxRows(1000)
+ .setBufferFlushMaxRows(10000)
.setFlushQueueSize(2)
.setUseCache(true)
.setIgnoreCommitError(false)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]