This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new d65c0223 [Improve] support gz compress in streamload (#434)
d65c0223 is described below
commit d65c0223362dc841a1db25cc11b3c487db7c13e6
Author: wudi <[email protected]>
AuthorDate: Mon Jul 22 15:49:26 2024 +0800
[Improve] support gz compress in streamload (#434)
---
.../flink/sink/batch/DorisBatchStreamLoad.java | 9 +++++
.../doris/flink/sink/writer/DorisStreamLoad.java | 10 +++++
.../doris/flink/sink/writer/LoadConstants.java | 2 +
.../apache/doris/flink/sink/DorisSinkITCase.java | 43 ++++++++++++++++++++++
4 files changed, 64 insertions(+)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
index 375e4335..c5614c31 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java
@@ -34,6 +34,7 @@ import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.HttpUtil;
import org.apache.doris.flink.sink.writer.LabelGenerator;
+import org.apache.http.client.entity.GzipCompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -65,6 +66,8 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.apache.doris.flink.sink.LoadStatus.PUBLISH_TIMEOUT;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
+import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE;
+import static
org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE_GZ;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
@@ -98,6 +101,7 @@ public class DorisBatchStreamLoad implements Serializable {
private HttpClientBuilder httpClientBuilder = new
HttpUtil().getHttpClientBuilderForBatch();
private BackendUtil backendUtil;
private boolean enableGroupCommit;
+ private boolean enableGzCompress;
public DorisBatchStreamLoad(
DorisOptions dorisOptions,
@@ -128,6 +132,7 @@ public class DorisBatchStreamLoad implements Serializable {
&& !loadProps
.getProperty(GROUP_COMMIT)
.equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
+ this.enableGzCompress = loadProps.getProperty(COMPRESS_TYPE,
"").equals(COMPRESS_TYPE_GZ);
this.executionOptions = executionOptions;
this.flushQueue = new
LinkedBlockingDeque<>(executionOptions.getFlushQueueSize());
if (StringUtils.isNotBlank(dorisOptions.getTableIdentifier())) {
@@ -285,6 +290,10 @@ public class DorisBatchStreamLoad implements Serializable {
.addHiddenColumns(executionOptions.getDeletable())
.addProperties(executionOptions.getStreamLoadProp());
+ if (enableGzCompress) {
+ putBuilder.setEntity(new GzipCompressingEntity(entity));
+ }
+
Throwable resEx = new Throwable();
int retry = 0;
while (retry <= executionOptions.getMaxRetries()) {
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
index 4cbcb431..060bccb5 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java
@@ -33,6 +33,7 @@ import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.flink.sink.EscapeHandler;
import org.apache.doris.flink.sink.HttpPutBuilder;
import org.apache.doris.flink.sink.ResponseUtil;
+import org.apache.http.client.entity.GzipCompressingEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.entity.InputStreamEntity;
import org.apache.http.impl.client.CloseableHttpClient;
@@ -56,6 +57,8 @@ import static
org.apache.doris.flink.sink.LoadStatus.LABEL_ALREADY_EXIST;
import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
import static org.apache.doris.flink.sink.ResponseUtil.LABEL_EXIST_PATTERN;
import static org.apache.doris.flink.sink.writer.LoadConstants.ARROW;
+import static org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE;
+import static
org.apache.doris.flink.sink.writer.LoadConstants.COMPRESS_TYPE_GZ;
import static org.apache.doris.flink.sink.writer.LoadConstants.CSV;
import static org.apache.doris.flink.sink.writer.LoadConstants.FORMAT_KEY;
import static org.apache.doris.flink.sink.writer.LoadConstants.GROUP_COMMIT;
@@ -90,6 +93,7 @@ public class DorisStreamLoad implements Serializable {
private boolean loadBatchFirstRecord;
private volatile String currentLabel;
private boolean enableGroupCommit;
+ private boolean enableGzCompress;
public DorisStreamLoad(
String hostPort,
@@ -137,6 +141,8 @@ public class DorisStreamLoad implements Serializable {
&& !streamLoadProp
.getProperty(GROUP_COMMIT)
.equalsIgnoreCase(GROUP_COMMIT_OFF_MODE);
+ this.enableGzCompress =
+ streamLoadProp.getProperty(COMPRESS_TYPE,
"").equals(COMPRESS_TYPE_GZ);
loadBatchFirstRecord = true;
}
@@ -319,6 +325,10 @@ public class DorisStreamLoad implements Serializable {
putBuilder.enable2PC();
}
+ if (enableGzCompress) {
+ putBuilder.setEntity(new GzipCompressingEntity(entity));
+ }
+
String executeMessage;
if (enableGroupCommit) {
executeMessage = "table " + table + " start execute load with
group commit";
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
index e8cd87e6..1e026977 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/LoadConstants.java
@@ -33,4 +33,6 @@ public class LoadConstants {
public static final String READ_JSON_BY_LINE = "read_json_by_line";
public static final String GROUP_COMMIT = "group_commit";
public static final String GROUP_COMMIT_OFF_MODE = "off_mode";
+ public static final String COMPRESS_TYPE = "compress_type";
+ public static final String COMPRESS_TYPE_GZ = "gz";
}
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
index 91077ea2..aa3d00da 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java
@@ -59,6 +59,7 @@ public class DorisSinkITCase extends DorisTestBase {
static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl";
static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS";
static final String TABLE_GROUP_COMMIT = "tbl_group_commit";
+ static final String TABLE_GZ_FORMAT = "tbl_gz_format";
static final String TABLE_CSV_JM = "tbl_csv_jm";
static final String TABLE_CSV_TM = "tbl_csv_tm";
@@ -315,6 +316,48 @@ public class DorisSinkITCase extends DorisTestBase {
checkResult(expected, query, 2);
}
+ @Test
+ public void testTableGzFormat() throws Exception {
+ initializeTable(TABLE_GZ_FORMAT);
+ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ String sinkDDL =
+ String.format(
+ "CREATE TABLE doris_gz_format_sink ("
+ + " name STRING,"
+ + " age INT"
+ + ") WITH ("
+ + " 'connector' = 'doris',"
+ + " 'fenodes' = '%s',"
+ + " 'table.identifier' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'sink.label-prefix' = '"
+ + UUID.randomUUID()
+ + "',"
+ + " 'sink.properties.column_separator' =
'\\x01',"
+ + " 'sink.properties.line_delimiter' =
'\\x02',"
+ + " 'sink.properties.compress_type' = 'gz'"
+ + ")",
+ getFenodes(),
+ DATABASE + "." + TABLE_GZ_FORMAT,
+ USERNAME,
+ PASSWORD);
+ tEnv.executeSql(sinkDDL);
+ tEnv.executeSql(
+ "INSERT INTO doris_gz_format_sink SELECT 'doris',1 union all
SELECT 'flink',2");
+
+ Thread.sleep(25000);
+ List<String> expected = Arrays.asList("doris,1", "flink,2");
+ String query =
+ String.format("select name,age from %s.%s order by 1",
DATABASE, TABLE_GZ_FORMAT);
+ //
+ checkResult(expected, query, 2);
+ }
+
@Test
public void testJobManagerFailoverSink() throws Exception {
initializeFailoverTable(TABLE_CSV_JM);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]