This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 654ec15fff5 [Fix](ecosystem) Fix kettle plugin load field
correspondence problem (#48412)
654ec15fff5 is described below
commit 654ec15fff5dc6d76679a7f1ed941896974f17dd
Author: wudi <[email protected]>
AuthorDate: Thu Feb 27 16:16:53 2025 +0800
[Fix](ecosystem) Fix kettle plugin load field correspondence problem
(#48412)
### What problem does this PR solve?
Issue Number: close #48302
1. Fix kettle plugin load field correspondence problem
2. Optimize some write logic of streamload
---
.../steps/dorisstreamloader/DorisStreamLoader.java | 28 ++++++++-----
.../load/DorisBatchStreamLoad.java | 47 ++++++++++++++++------
.../steps/dorisstreamloader/load/DorisOptions.java | 2 +-
3 files changed, 53 insertions(+), 24 deletions(-)
diff --git
a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java
b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java
index 91b546baf21..1303d628265 100644
---
a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java
+++
b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/DorisStreamLoader.java
@@ -72,6 +72,7 @@ public class DorisStreamLoader extends BaseStep implements
StepInterface {
return false;
}
if ( first ) {
+ logDebug("First process row with meta : " + meta.toString());
first = false;
// Cache field indexes.
data.keynrs = new int[meta.getFieldStream().length];
@@ -84,11 +85,15 @@ public class DorisStreamLoader extends BaseStep implements
StepInterface {
data.formatMeta[i] = sourceMeta.clone();
}
+ // use field table name to serializer data
+ String[] fieldNames = new String[meta.getFieldTable().length];
+ System.arraycopy(meta.getFieldTable(), 0, fieldNames, 0,
meta.getFieldTable().length);
+
Properties loadProperties = options.getStreamLoadProp();
//builder serializer
data.serializer = DorisRecordSerializer.builder()
.setType(loadProperties.getProperty(FORMAT_KEY, CSV))
- .setFieldNames(getInputRowMeta().getFieldNames())
+ .setFieldNames(fieldNames)
.setFormatMeta(data.formatMeta)
.setFieldDelimiter(loadProperties.getProperty(FIELD_DELIMITER_KEY,
FIELD_DELIMITER_DEFAULT))
.setLogChannelInterface(log)
@@ -136,17 +141,18 @@ public class DorisStreamLoader extends BaseStep
implements StepInterface {
}
}
}
+
options = DorisOptions.builder()
- .withFenodes(meta.getFenodes())
- .withDatabase(meta.getDatabase())
- .withTable(meta.getTable())
- .withUsername(meta.getUsername())
- .withPassword(meta.getPassword())
- .withBufferFlushMaxBytes(meta.getBufferFlushMaxBytes())
- .withBufferFlushMaxRows(meta.getBufferFlushMaxRows())
- .withMaxRetries(meta.getMaxRetries())
- .withStreamLoadProp(streamHeaders)
- .withDeletable(meta.isDeletable()).build();
+ .withFenodes(meta.getFenodes())
+ .withDatabase(meta.getDatabase())
+ .withTable(meta.getTable())
+ .withUsername(meta.getUsername())
+ .withPassword(meta.getPassword())
+ .withBufferFlushMaxBytes(meta.getBufferFlushMaxBytes())
+ .withBufferFlushMaxRows(meta.getBufferFlushMaxRows())
+ .withMaxRetries(meta.getMaxRetries())
+ .withStreamLoadProp(streamHeaders)
+ .withDeletable(meta.isDeletable()).build();
logDetailed("Initializing step with options: " + options.toString());
streamLoad = new DorisBatchStreamLoad(options, log);
diff --git
a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java
b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java
index 226f5f61e73..de5897e2cdd 100644
---
a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java
+++
b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisBatchStreamLoad.java
@@ -49,7 +49,9 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import static
org.pentaho.di.trans.steps.dorisstreamloader.load.LoadConstants.ARROW;
@@ -93,6 +95,7 @@ public class DorisBatchStreamLoad implements Serializable {
private final AtomicLong currentCacheBytes = new AtomicLong(0L);
private final Lock lock = new ReentrantLock();
private final Condition block = lock.newCondition();
+ private final Map<String, ReadWriteLock> bufferMapLock = new
ConcurrentHashMap<>();
private final int FLUSH_QUEUE_SIZE = 2;
private DorisOptions options;
private LogChannelInterface log;
@@ -188,6 +191,8 @@ public class DorisBatchStreamLoad implements Serializable {
public void writeRecord(String database, String table, byte[] record) {
checkFlushException();
String bufferKey = getTableIdentifier(database, table);
+
+ getLock(bufferKey).readLock().lock();
BatchRecordBuffer buffer =
bufferMap.computeIfAbsent(
bufferKey,
@@ -200,6 +205,8 @@ public class DorisBatchStreamLoad implements Serializable {
int bytes = buffer.insert(record);
currentCacheBytes.addAndGet(bytes);
+ getLock(bufferKey).readLock().unlock();
+
if (currentCacheBytes.get() > maxBlockedBytes) {
lock.lock();
try {
@@ -241,7 +248,7 @@ public class DorisBatchStreamLoad implements Serializable {
* Force flush and wait for success.
* @return
*/
- public boolean forceFlush() {
+ public boolean forceFlush() {
return doFlush(null, true, false);
}
@@ -259,8 +266,9 @@ public class DorisBatchStreamLoad implements Serializable {
}
private synchronized boolean flush(String bufferKey, boolean waitUtilDone)
{
- if (bufferMap.isEmpty()) {
+ if (!waitUtilDone && bufferMap.isEmpty()) {
// bufferMap may have been flushed by other threads
+ log.logDetailed("bufferMap is empty, no need to flush {}",
bufferKey);
return false;
}
if (null == bufferKey) {
@@ -288,12 +296,21 @@ public class DorisBatchStreamLoad implements Serializable
{
}
private synchronized void flushBuffer(String bufferKey) {
- BatchRecordBuffer buffer = bufferMap.get(bufferKey);
+ BatchRecordBuffer buffer;
+ try {
+ getLock(bufferKey).writeLock().lock();
+ buffer = bufferMap.remove(bufferKey);
+ } finally {
+ getLock(bufferKey).writeLock().unlock();
+ }
+ if (buffer == null) {
+ log.logDetailed("buffer key is not exist {}, skipped", bufferKey);
+ return;
+ }
String label = String.format("%s_%s_%s", "kettle", buffer.getTable(),
UUID.randomUUID());
buffer.setLabelName(label);
log.logDetailed("Flush buffer, table " + bufferKey + ", records " +
buffer.getNumOfRecords());
putRecordToFlushQueue(buffer);
- bufferMap.remove(bufferKey);
}
private void putRecordToFlushQueue(BatchRecordBuffer buffer) {
@@ -306,6 +323,9 @@ public class DorisBatchStreamLoad implements Serializable {
} catch (InterruptedException e) {
throw new RuntimeException("Failed to put record buffer to flush
queue");
}
+ // When the load thread reports an error, the flushQueue will be
cleared,
+ // and need to force a check for the exception.
+ checkFlushException();
}
private void checkFlushException() {
@@ -317,7 +337,7 @@ public class DorisBatchStreamLoad implements Serializable {
private void waitAsyncLoadFinish() {
// Because the flush thread will drainTo once after polling is
completed
// if queue_size is 2, at least 4 empty queues must be consumed to
ensure that flush has been completed
- for (int i = 0; i < FLUSH_QUEUE_SIZE * 2; i++) {
+ for (int i = 0; i < FLUSH_QUEUE_SIZE * 2 + 1; i++) {
BatchRecordBuffer empty = new BatchRecordBuffer();
putRecordToFlushQueue(empty);
}
@@ -331,8 +351,6 @@ public class DorisBatchStreamLoad implements Serializable {
// close async executor
this.loadExecutorService.shutdown();
this.started.set(false);
- // clear buffer
- this.flushQueue.clear();
}
public boolean mergeBuffer(List<BatchRecordBuffer> recordList,
BatchRecordBuffer buffer) {
@@ -381,6 +399,10 @@ public class DorisBatchStreamLoad implements Serializable {
return true;
}
+ private ReadWriteLock getLock(String bufferKey) {
+ return bufferMapLock.computeIfAbsent(bufferKey, k -> new
ReentrantReadWriteLock());
+ }
+
class LoadAsyncExecutor implements Runnable {
private int flushQueueSize;
@@ -482,11 +504,6 @@ public class DorisBatchStreamLoad implements Serializable {
lock.unlock();
}
return;
- } else if (LoadStatus.LABEL_ALREADY_EXIST.equals(
- respContent.getStatus())) {
- // todo: need to abort transaction when
JobStatus not finished
- putBuilder.setLabel(label + "_" + retry);
- reason = respContent.getMessage();
} else {
String errMsg = null;
if
(StringUtils.isBlank(respContent.getMessage())
@@ -523,6 +540,12 @@ public class DorisBatchStreamLoad implements Serializable {
// get available backend retry
refreshLoadUrl(buffer.getDatabase(), buffer.getTable());
putBuilder.setUrl(loadUrl);
+ putBuilder.setLabel(label + "_" + retry);
+
+ try {
+ Thread.sleep(1000L * retry);
+ } catch (InterruptedException e) {
+ }
}
buffer.clear();
buffer = null;
diff --git
a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java
b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java
index 202b996cafe..2d6489dc48d 100644
---
a/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java
+++
b/extension/kettle/impl/src/main/java/org/pentaho/di/trans/steps/dorisstreamloader/load/DorisOptions.java
@@ -179,7 +179,7 @@ public class DorisOptions {
Preconditions.checkArgument(database != null, "Database must not
be null");
Preconditions.checkArgument(table != null, "Table must not be
null");
Preconditions.checkArgument(bufferFlushMaxRows >= 10000,
"BufferFlushMaxRows must be greater than 10000");
- Preconditions.checkArgument(bufferFlushMaxBytes >= 10 * 1024 *
1024, "BufferFlushMaxBytes must be greater than 10MB");
+ Preconditions.checkArgument(bufferFlushMaxBytes >= 10 * 1024 *
1024, "BufferFlushMaxBytes must be greater than 10485760(10MB)");
Preconditions.checkArgument(maxRetries >= 0, "MaxRetries must be
greater than 0");
return new DorisOptions(fenodes, username, password, database,
table, bufferFlushMaxRows, bufferFlushMaxBytes, streamLoadProp, maxRetries,
deletable);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]