This is an automated email from the ASF dual-hosted git repository.
hansva pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hop.git
The following commit(s) were added to refs/heads/main by this push:
new 74e063d69e Some Fixes to the Doris Bulk Stream Loader (#5973)
74e063d69e is described below
commit 74e063d69e4015505c7b01058e87ed00e23b29d5
Author: Davide Villa <[email protected]>
AuthorDate: Fri Nov 14 16:06:31 2025 +0100
Some Fixes to the Doris Bulk Stream Loader (#5973)
* Corrected typo on messages keyword.
Added Italian Translation.
* Corrected typo on default constants.
Added default values for Buffer size and Buffer counts
Fixed possible buffer underrun that leads to infinite wait
on buffer writing queue
* Added gzip compression for CSV data format
---------
Co-authored-by: davidev <davidev@tuxedo-davidev>
---
.../dorisbulkloader/DorisBulkLoaderDialog.java | 2 +-
.../dorisbulkloader/DorisBulkLoaderMeta.java | 2 +-
.../dorisbulkloader/DorisStreamLoad.java | 14 ++++++-
.../transforms/dorisbulkloader/LoadConstants.java | 7 +++-
.../transforms/dorisbulkloader/RecordBuffer.java | 2 +-
.../transforms/dorisbulkloader/RecordStream.java | 9 +++++
.../messages/messages_en_US.properties | 2 +-
.../messages/messages_it_IT.properties | 46 ++++++++++++++++++++++
.../messages/messages_pt_BR.properties | 22 +++++------
9 files changed, 88 insertions(+), 18 deletions(-)
diff --git
a/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/DorisBulkLoaderDialog.java
b/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/DorisBulkLoaderDialog.java
index 632719e960..17b77a7f56 100644
---
a/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/DorisBulkLoaderDialog.java
+++
b/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/DorisBulkLoaderDialog.java
@@ -675,7 +675,7 @@ public class DorisBulkLoaderDialog extends
BaseTransformDialog {
}
wBufferSize.setText(Integer.toString(input.getBufferSize()));
- wBufferCount.setText(Integer.toString(input.getBufferSize()));
+ wBufferCount.setText(Integer.toString(input.getBufferCount()));
wTransformName.selectAll();
wTransformName.setFocus();
diff --git
a/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/DorisBulkLoaderMeta.java
b/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/DorisBulkLoaderMeta.java
index df4ea1491f..ea85b924bb 100644
---
a/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/DorisBulkLoaderMeta.java
+++
b/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/DorisBulkLoaderMeta.java
@@ -114,7 +114,7 @@ public class DorisBulkLoaderMeta extends
BaseTransformMeta<DorisBulkLoader, Dori
cr =
new CheckResult(
ICheckResult.TYPE_RESULT_ERROR,
- BaseMessages.getString(PKG,
"DorisBulkLoaderMeta.CheckResult.NoInpuReceived"),
+ BaseMessages.getString(PKG,
"DorisBulkLoaderMeta.CheckResult.NoInputReceived"),
transformMeta);
}
remarks.add(cr);
diff --git
a/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/DorisStreamLoad.java
b/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/DorisStreamLoad.java
index b9549cc227..a3664be5fb 100644
---
a/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/DorisStreamLoad.java
+++
b/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/DorisStreamLoad.java
@@ -27,6 +27,7 @@ import org.apache.commons.codec.binary.Base64;
import org.apache.hop.core.encryption.Encr;
import org.apache.hop.core.json.HopJson;
import org.apache.http.HttpHeaders;
+import org.apache.http.client.entity.GzipCompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.InputStreamEntity;
@@ -155,18 +156,27 @@ public class DorisStreamLoad {
*/
public ResponseContent executeDorisStreamLoad() throws IOException,
DorisStreamLoadException {
HttpPut put = new HttpPut(loadUrl);
- put.setHeader(HttpHeaders.EXPECT, LoadConstants.EXCEPT_DEFAULT);
+ put.setHeader(HttpHeaders.EXPECT, LoadConstants.EXPECT_DEFAULT);
put.setHeader(HttpHeaders.AUTHORIZATION, basicAuthHeader(loginUser,
loginPassword));
put.setHeader(
LoadConstants.LABEL_KEY, LoadConstants.LABEL_SUFFIX +
UUID.randomUUID().toString());
if (LoadConstants.JSON.equals(format)) {
put.setHeader(LoadConstants.STRIP_OUTER_ARRAY_KEY,
LoadConstants.STRIP_OUTER_ARRAY_DEFAULT);
+ } else {
+ put.setHeader(LoadConstants.COMPRESS_TYPE_KEY,
LoadConstants.COMPRESS_FORMAT_GZ);
+ put.setHeader(HttpHeaders.CONTENT_ENCODING, LoadConstants.GZIP_ENCODING);
}
httpHeaders.forEach(put::setHeader);
InputStreamEntity entity = new InputStreamEntity(recordStream,
recordStream.getWriteLength());
entity.setChunked(false);
- put.setEntity(entity);
+
+ if (LoadConstants.JSON.equals(format)) {
+ put.setEntity(entity);
+ } else {
+ GzipCompressingEntity gzipEntity = new GzipCompressingEntity(entity);
+ put.setEntity(gzipEntity);
+ }
if (httpClient == null) {
httpClient =
diff --git
a/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/LoadConstants.java
b/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/LoadConstants.java
index 46a0cb2f58..8771ddb84f 100644
---
a/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/LoadConstants.java
+++
b/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/LoadConstants.java
@@ -24,12 +24,13 @@ public class LoadConstants {
public static final String CSV = "csv";
public static final String LABEL_KEY = "label";
public static final String FORMAT_KEY = "format";
+ public static final String COMPRESS_TYPE_KEY = "compress_type";
public static final String FIELD_DELIMITER_KEY = "column_separator";
public static final String FIELD_DELIMITER_DEFAULT = ",";
public static final String LINE_DELIMITER_KEY = "line_delimiter";
public static final String LINE_DELIMITER_DEFAULT = "\\n";
public static final String LINE_DELIMITER_JSON = ",";
- public static final String EXCEPT_DEFAULT = "100-continue";
+ public static final String EXPECT_DEFAULT = "100-continue";
public static final String STRIP_OUTER_ARRAY_KEY = "strip_outer_array";
public static final String STRIP_OUTER_ARRAY_DEFAULT = "true";
public static final String NULL_VALUE = "\\N";
@@ -38,4 +39,8 @@ public class LoadConstants {
public static final String LOAD_URL_PATTERN =
"http://%s:%s/api/%s/%s/_stream_load";
public static final String JSON_ARRAY_START = "[";
public static final String JSON_ARRAY_END = "]";
+ public static final String GZIP_ENCODING = "gzip";
+ public static final String COMPRESS_FORMAT_GZ = "gz";
+ public static final int DEFAULT_BUFFER_SIZE = 50000;
+ public static final int DEFAULT_BUFFER_COUNT = 256;
}
diff --git
a/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/RecordBuffer.java
b/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/RecordBuffer.java
index 7ab4cf4b2e..bf930f6277 100644
---
a/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/RecordBuffer.java
+++
b/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/RecordBuffer.java
@@ -95,7 +95,7 @@ public class RecordBuffer {
* @return
*/
public boolean canWrite(long writeLength) {
- return this.writeLength + writeLength <= bufferSize * bufferCount;
+ return this.writeLength + writeLength <= bufferSize * (bufferCount - 1);
}
/**
diff --git
a/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/RecordStream.java
b/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/RecordStream.java
index 6531e62fa8..b78ba1c381 100644
---
a/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/RecordStream.java
+++
b/plugins/transforms/dorisbulkloader/src/main/java/org/apache/hop/pipeline/transforms/dorisbulkloader/RecordStream.java
@@ -34,6 +34,15 @@ public class RecordStream extends InputStream {
* real stream load
*/
public RecordStream(int bufferSize, int bufferCount) {
+
+ // Check for valid values
+ if (bufferSize == 0) {
+ bufferSize = LoadConstants.DEFAULT_BUFFER_SIZE;
+ }
+ if (bufferCount == 0) {
+ bufferCount = LoadConstants.DEFAULT_BUFFER_COUNT;
+ }
+
this.recordBuffer = new RecordBuffer(bufferSize, bufferCount);
}
diff --git
a/plugins/transforms/dorisbulkloader/src/main/resources/org/apache/hop/pipeline/transforms/dorisbulkloader/messages/messages_en_US.properties
b/plugins/transforms/dorisbulkloader/src/main/resources/org/apache/hop/pipeline/transforms/dorisbulkloader/messages/messages_en_US.properties
index 36a267a5d6..4e1656509e 100644
---
a/plugins/transforms/dorisbulkloader/src/main/resources/org/apache/hop/pipeline/transforms/dorisbulkloader/messages/messages_en_US.properties
+++
b/plugins/transforms/dorisbulkloader/src/main/resources/org/apache/hop/pipeline/transforms/dorisbulkloader/messages/messages_en_US.properties
@@ -58,7 +58,7 @@ DorisBulkLoaderDialog.Shell.Title=Doris bulk loader
DorisBulkLoaderDialog.TableName.Label=Table Name
DorisBulkLoaderDialog.TableName.Tooltip=Doris table name to stream load
DorisBulkLoaderDialog.TransformName.Label=Transform name
-DorisBulkLoaderMeta.CheckResult.NoInpuReceived=No input received from other
transforms\!
+DorisBulkLoaderMeta.CheckResult.NoInputReceived=No input received from other
transforms\!
DorisBulkLoaderMeta.CheckResult.ReceivingInfoFromOtherTransforms=Transform is
receiving info from other transforms.
DorisBulkLoaderMeta.Exception.UnableToReadTransformMeta=Unable to read
transform information from XML
DorisBulkLoaderMeta.keyword=dorisBulkLoader
diff --git
a/plugins/transforms/dorisbulkloader/src/main/resources/org/apache/hop/pipeline/transforms/dorisbulkloader/messages/messages_it_IT.properties
b/plugins/transforms/dorisbulkloader/src/main/resources/org/apache/hop/pipeline/transforms/dorisbulkloader/messages/messages_it_IT.properties
index 1692cdb472..e8123840eb 100644
---
a/plugins/transforms/dorisbulkloader/src/main/resources/org/apache/hop/pipeline/transforms/dorisbulkloader/messages/messages_it_IT.properties
+++
b/plugins/transforms/dorisbulkloader/src/main/resources/org/apache/hop/pipeline/transforms/dorisbulkloader/messages/messages_it_IT.properties
@@ -17,3 +17,49 @@
#
#
+BaseTransform.TypeLongDesc.DorisBulkLoader=Caricatore massivo Doris
+BaseTransform.TypeTooltipDesc.DorisBulkLoader=Chiama l'API HTTP di caricamento
in streaming di Doris per caricare i dati
+DorisBulkLoader.ErrorInTransformRunning=A causa di un errore, questa
trasformazione non pu\u00f2 continuare
+DorisBulkLoader.Log.ExceedBufferLimit=I dati della riga corrente superano la
dimensione massima del buffer per il caricamento in streaming
+DorisBulkLoader.Log.LineNumber=numero riga
+DorisBulkLoader.Log.StreamLoadParameter=Il parametro corrente del caricamento
in streaming \u00e8: {0}
+DorisBulkLoader.Log.StreamLoadResult=Il risultato corrente del caricamento in
streaming \u00e8: {0}
+DorisBulkLoader.Log.StreamLoadRowValue=Aggiunta del valore di riga per il
caricamento in streaming di Doris [{0}]
+DorisBulkLoaderDialog.BufferCount.Label=Conteggio buffer
+DorisBulkLoaderDialog.BufferCount.Tooltip=BufferSize * BufferCount \u00e8 la
capacit\u00e0 massima per memorizzare i dati prima del caricamento reale in
streaming
+DorisBulkLoaderDialog.BufferSize.Label=Dimensione buffer
+DorisBulkLoaderDialog.BufferSize.Tooltip=Capacit\u00e0 di un buffer, in byte.
+DorisBulkLoaderDialog.BulkData.Label=Campo dati
+DorisBulkLoaderDialog.BulkData.Tooltip=Il campo dati proveniente dai campi a
monte viene utilizzato come dati per il caricamento in streaming
+DorisBulkLoaderDialog.BulkDataGroup.Label=Dati massivi
+DorisBulkLoaderDialog.ColumnDelimiter.Label=Separatore di colonne
+DorisBulkLoaderDialog.ColumnDelimiter.Tooltip=Il separatore di colonne dei
dati di caricamento in streaming di Doris \u00e8 utilizzato lato server Doris
+DorisBulkLoaderDialog.ColumnInfo.Header=Intestazione
+DorisBulkLoaderDialog.ColumnInfo.Value=Valore
+DorisBulkLoaderDialog.ConnectionsGroup.Label=Connessioni
+DorisBulkLoaderDialog.DatabaseName.Label=Nome del database
+DorisBulkLoaderDialog.DatabaseName.Tooltip=Nome del database Doris su cui
eseguire il caricamento in streaming
+DorisBulkLoaderDialog.FeHost.Label=Host FE
+DorisBulkLoaderDialog.FeHost.Tooltip=Nome di dominio o IP dell'host FE di Doris
+DorisBulkLoaderDialog.FeRestPort.Label=Porta HTTP FE
+DorisBulkLoaderDialog.FeRestPort.Tooltip=Porta HTTP dell'FE di Doris
+DorisBulkLoaderDialog.Format.Label=Formato
+DorisBulkLoaderDialog.Format.Tooltip=Il formato dei dati di caricamento in
streaming di Doris \u00e8 utilizzato lato server Doris
+DorisBulkLoaderDialog.GeneralTab.Title=Generale
+DorisBulkLoaderDialog.Headers.Label=Intestazione
+DorisBulkLoaderDialog.Headers.Title=Intestazioni
+DorisBulkLoaderDialog.HttpLogin.Label=Utente di accesso
+DorisBulkLoaderDialog.HttpLogin.Tooltip=L'utente di accesso viene utilizzato
per autenticare l'API HTTP di caricamento in streaming di Doris
+DorisBulkLoaderDialog.HttpPassword.Label=Password di accesso
+DorisBulkLoaderDialog.HttpPassword.Tooltip=La password di accesso viene
utilizzata per autenticare l'API HTTP di caricamento in streaming di Doris
+DorisBulkLoaderDialog.LineDelimiter.Label=Delimitatore di riga
+DorisBulkLoaderDialog.LineDelimiter.Tooltip=Il delimitatore di riga dei dati
di caricamento in streaming di Doris \u00e8 utilizzato lato server Doris
+DorisBulkLoaderDialog.Log.GettingKeyInfo=recupero delle informazioni chiave...
+DorisBulkLoaderDialog.Shell.Title=Caricatore massivo Doris
+DorisBulkLoaderDialog.TableName.Label=Nome della tabella
+DorisBulkLoaderDialog.TableName.Tooltip=Nome della tabella Doris su cui
eseguire il caricamento in streaming
+DorisBulkLoaderDialog.TransformName.Label=Nome della trasformazione
+DorisBulkLoaderMeta.CheckResult.NoInputReceived=Nessun input ricevuto da altre
trasformazioni\!
+DorisBulkLoaderMeta.CheckResult.ReceivingInfoFromOtherTransforms=La
trasformazione sta ricevendo informazioni da altre trasformazioni.
+DorisBulkLoaderMeta.Exception.UnableToReadTransformMeta=Impossibile leggere le
informazioni della trasformazione dal file XML
+
diff --git
a/plugins/transforms/dorisbulkloader/src/main/resources/org/apache/hop/pipeline/transforms/dorisbulkloader/messages/messages_pt_BR.properties
b/plugins/transforms/dorisbulkloader/src/main/resources/org/apache/hop/pipeline/transforms/dorisbulkloader/messages/messages_pt_BR.properties
index 2f622aa2a0..f97793eee4 100644
---
a/plugins/transforms/dorisbulkloader/src/main/resources/org/apache/hop/pipeline/transforms/dorisbulkloader/messages/messages_pt_BR.properties
+++
b/plugins/transforms/dorisbulkloader/src/main/resources/org/apache/hop/pipeline/transforms/dorisbulkloader/messages/messages_pt_BR.properties
@@ -18,22 +18,22 @@
#
DorisBulkLoader.ErrorCode=Carregador em volume Doris
-DorisBulkLoader.ErrorInTransformRunning=Por causa de um erro esta
transforma\u00E7\u00E3o n\u00E3o pode continuar
-DorisBulkLoader.Log.LineNumber=n\u00FAmero da linha
+DorisBulkLoader.ErrorInTransformRunning=Por causa de um erro esta
transforma\u00e7\u00e3o n\u00e3o pode continuar
+DorisBulkLoader.Log.LineNumber=n\u00famero da linha
DorisBulkLoaderDialog.BulkDataGroup.Label=Dados em massa
-DorisBulkLoaderDialog.ColumnInfo.Header=Cabe\u00E7alho
+DorisBulkLoaderDialog.ColumnInfo.Header=Cabe\u00e7alho
DorisBulkLoaderDialog.ColumnInfo.Value=Valor
-DorisBulkLoaderDialog.ConnectionsGroup.Label=Conex\u00F5es
+DorisBulkLoaderDialog.ConnectionsGroup.Label=Conex\u00f5es
DorisBulkLoaderDialog.DatabaseName.Label=Nome da base de dados
DorisBulkLoaderDialog.Format.Label=Formato
DorisBulkLoaderDialog.GeneralTab.Title=Geral
-DorisBulkLoaderDialog.Headers.Label=Cabe\u00E7alho
-DorisBulkLoaderDialog.Headers.Title=Cabe\u00E7alhos
-DorisBulkLoaderDialog.Log.GettingKeyInfo=obtendo informa\u00E7\u00E3o da
chave...
+DorisBulkLoaderDialog.Headers.Label=Cabe\u00e7alho
+DorisBulkLoaderDialog.Headers.Title=Cabe\u00e7alhos
+DorisBulkLoaderDialog.Log.GettingKeyInfo=obtendo informa\u00e7\u00e3o da
chave...
DorisBulkLoaderDialog.TableName.Label=Nome da tabela
-DorisBulkLoaderDialog.TransformName.Label=Nome da transforma\u00E7\u00E3o
-DorisBulkLoaderMeta.CheckResult.NoInpuReceived=Nenhuma entrada foi recebida de
outras transforma\u00E7\u00F5es !
-DorisBulkLoaderMeta.CheckResult.ReceivingInfoFromOtherTransforms=A
transforma\u00E7\u00E3o est\u00E1 recebendo informa\u00E7\u00F5es de outras
transforma\u00E7\u00F5es.
-DorisBulkLoaderMeta.Exception.UnableToReadTransformMeta=N\u00E3o foi
poss\u00EDvel ler a informa\u00E7\u00E3o da transforma\u00E7\u00E3o do XML
+DorisBulkLoaderDialog.TransformName.Label=Nome da transforma\u00e7\u00e3o
+DorisBulkLoaderMeta.CheckResult.NoInputReceived=Nenhuma entrada foi recebida
de outras transforma\u00e7\u00f5es !
+DorisBulkLoaderMeta.CheckResult.ReceivingInfoFromOtherTransforms=A
transforma\u00e7\u00e3o est\u00e1 recebendo informa\u00e7\u00f5es de outras
transforma\u00e7\u00f5es.
+DorisBulkLoaderMeta.Exception.UnableToReadTransformMeta=N\u00e3o foi
poss\u00edvel ler a informa\u00e7\u00e3o da transforma\u00e7\u00e3o do XML
DorisBulkLoaderMeta.keyword=Carregador em volume Doris
BaseTransform.TypeLongDesc.DorisBulkLoader=Carregador em massa Doris