This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new b31a9572689 branch-4.0: [Improve](StreamingJob) add max_filter_ratio
and strict mode for mysql/pg streaming job #60473 (#60527)
b31a9572689 is described below
commit b31a957268965deedb521ff8e83fbe12af682fc7
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Feb 5 17:42:22 2026 +0800
branch-4.0: [Improve](StreamingJob) add max_filter_ratio and strict mode
for mysql/pg streaming job #60473 (#60527)
Cherry-picked from #60473
Co-authored-by: wudi <[email protected]>
---
.../apache/doris/job/cdc/DataSourceConfigKeys.java | 4 ++
...RecordRequest.java => CommitOffsetRequest.java} | 26 +++++---
.../doris/job/cdc/request/FetchRecordRequest.java | 12 ----
.../job/cdc/request/JobBaseRecordRequest.java | 4 --
.../doris/job/cdc/request/WriteRecordRequest.java | 13 +---
.../antlr4/org/apache/doris/nereids/DorisParser.g4 | 2 +-
.../doris/httpv2/rest/StreamingJobAction.java | 17 +----
.../streaming/DataSourceConfigValidator.java | 16 +++--
.../insert/streaming/StreamingInsertJob.java | 74 ++++++++++++++++++++--
.../streaming/StreamingJobSchedulerTask.java | 1 +
.../insert/streaming/StreamingJobStatistic.java | 3 +
.../insert/streaming/StreamingMultiTblTask.java | 37 +++++++++--
.../apache/doris/job/util/StreamingJobUtils.java | 5 +-
.../cdcclient/service/PipelineCoordinator.java | 10 +--
.../doris/cdcclient/sink/DorisBatchStreamLoad.java | 31 ++++++---
.../doris/cdcclient/sink/HttpPutBuilder.java | 9 +--
.../apache/doris/cdcclient/sink/LoadStatistic.java | 25 ++++----
.../cdc/test_streaming_mysql_job_errormsg.out | 4 ++
.../cdc/test_streaming_mysql_job.groovy | 4 +-
.../test_streaming_mysql_job_create_alter.groovy | 2 +-
.../cdc/test_streaming_mysql_job_errormsg.groovy | 45 ++++++++++++-
.../cdc/test_streaming_mysql_job_restart_fe.groovy | 10 ++-
.../cdc/test_streaming_postgres_job.groovy | 4 +-
.../cdc/test_streaming_postgres_job_split.groovy | 6 +-
.../streaming_job/test_streaming_insert_job.groovy | 6 +-
.../test_streaming_insert_job_offset.groovy | 12 +++-
...st_streaming_job_alter_offset_restart_fe.groovy | 24 +++++--
.../test_streaming_job_restart_fe.groovy | 12 +++-
28 files changed, 300 insertions(+), 118 deletions(-)
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
index d29cdef0766..17c5d7d575e 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
@@ -35,4 +35,8 @@ public class DataSourceConfigKeys {
public static final String SNAPSHOT_SPLIT_SIZE = "snapshot_split_size";
public static final String SNAPSHOT_PARALLELISM = "snapshot_parallelism";
public static final String SNAPSHOT_PARALLELISM_DEFAULT = "1";
+
+ // target properties
+ public static final String TABLE_PROPS_PREFIX = "table.create.properties.";
+ public static final String LOAD_PROPERTIES = "load.";
}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
similarity index 69%
copy from
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
copy to
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
index a9a1be374db..3d2d221ea49 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java
@@ -17,19 +17,25 @@
package org.apache.doris.job.cdc.request;
-import lombok.EqualsAndHashCode;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
import lombok.Getter;
+import lombok.NoArgsConstructor;
import lombok.Setter;
-
-import java.util.Map;
+import lombok.ToString;
@Getter
@Setter
-@EqualsAndHashCode(callSuper = true)
-public abstract class JobBaseRecordRequest extends JobBaseConfig {
- protected Map<String, Object> meta;
-
- public abstract boolean isReload();
-
- public abstract int getFetchSize();
+@NoArgsConstructor
+@ToString
+@AllArgsConstructor
+@Builder
+public class CommitOffsetRequest {
+ public long jobId;
+ public long taskId;
+ public String offset;
+ public long scannedRows;
+ public long filteredRows;
+ public long loadedRows;
+ public long loadBytes;
}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
index f11539e6832..7ed28d618fd 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
@@ -23,16 +23,4 @@ import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = true)
public class FetchRecordRequest extends JobBaseRecordRequest {
- private boolean reload = true;
- private int fetchSize;
-
- @Override
- public boolean isReload() {
- return reload;
- }
-
- @Override
- public int getFetchSize() {
- return fetchSize;
- }
}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
index a9a1be374db..282913e2dd2 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
@@ -28,8 +28,4 @@ import java.util.Map;
@EqualsAndHashCode(callSuper = true)
public abstract class JobBaseRecordRequest extends JobBaseConfig {
protected Map<String, Object> meta;
-
- public abstract boolean isReload();
-
- public abstract int getFetchSize();
}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
index a75edfcf7fb..195125b1c66 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
+++
b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/WriteRecordRequest.java
@@ -20,6 +20,8 @@ package org.apache.doris.job.cdc.request;
import lombok.Data;
import lombok.EqualsAndHashCode;
+import java.util.Map;
+
@Data
@EqualsAndHashCode(callSuper = true)
public class WriteRecordRequest extends JobBaseRecordRequest {
@@ -28,14 +30,5 @@ public class WriteRecordRequest extends JobBaseRecordRequest
{
private String token;
private String frontendAddress;
private String taskId;
-
- @Override
- public boolean isReload() {
- return true;
- }
-
- @Override
- public int getFetchSize() {
- return Integer.MAX_VALUE;
- }
+ private Map<String, String> streamLoadProps;
}
diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index abef3617806..2985b77582f 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -100,7 +100,7 @@ materializedViewStatement
;
jobFromToClause
- : FROM sourceType=identifier LEFT_PAREN sourceProperties=propertyItemList
RIGHT_PAREN
+ : FROM sourceType=identifier (LEFT_PAREN sourceProperties=propertyItemList
RIGHT_PAREN)?
TO DATABASE targetDb=identifier (LEFT_PAREN
targetProperties=propertyItemList RIGHT_PAREN)?
;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
index 573e0a17f16..34d7abe3133 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/StreamingJobAction.java
@@ -21,14 +21,11 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.exception.UnauthorizedException;
import org.apache.doris.job.base.AbstractJob;
+import org.apache.doris.job.cdc.request.CommitOffsetRequest;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import com.google.common.base.Strings;
import jakarta.servlet.http.HttpServletRequest;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import lombok.ToString;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.web.bind.annotation.RequestBody;
@@ -77,16 +74,4 @@ public class StreamingJobAction extends RestBaseController {
return ResponseEntityBuilder.okWithCommonError(e.getMessage());
}
}
-
- @Getter
- @Setter
- @NoArgsConstructor
- @ToString
- public static class CommitOffsetRequest {
- public long jobId;
- public long taskId;
- public String offset;
- public long scannedRows;
- public long scannedBytes;
- }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
index ddaf8456271..cb7ec530fcf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
@@ -18,7 +18,7 @@
package org.apache.doris.job.extensions.insert.streaming;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
-import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import com.google.common.collect.Sets;
@@ -59,9 +59,17 @@ public class DataSourceConfigValidator {
public static void validateTarget(Map<String, String> input) throws
IllegalArgumentException {
for (Map.Entry<String, String> entry : input.entrySet()) {
String key = entry.getKey();
- if (!key.startsWith(StreamingJobUtils.TABLE_PROPS_PREFIX)) {
- throw new IllegalArgumentException("Only support target
properties with prefix "
- + StreamingJobUtils.TABLE_PROPS_PREFIX);
+ if (!key.startsWith(DataSourceConfigKeys.TABLE_PROPS_PREFIX)
+ && !key.startsWith(DataSourceConfigKeys.LOAD_PROPERTIES)) {
+ throw new IllegalArgumentException("Not support target
properties key " + key);
+ }
+
+ if (key.equals(DataSourceConfigKeys.LOAD_PROPERTIES +
LoadCommand.MAX_FILTER_RATIO_PROPERTY)) {
+ try {
+ Double.parseDouble(entry.getValue());
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException("Invalid value for key
'" + key + "': " + entry.getValue());
+ }
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 07ede21a5b8..43fe652ee06 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -32,11 +32,11 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
-import org.apache.doris.httpv2.rest.StreamingJobAction.CommitOffsetRequest;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.base.JobExecutionConfiguration;
import org.apache.doris.job.base.TimerDefinition;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.CommitOffsetRequest;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.job.common.FailureReason;
import org.apache.doris.job.common.IntervalUnit;
@@ -60,6 +60,7 @@ import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.AlterJobCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateTableCommand;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.nereids.trees.plans.commands.info.BaseViewInfo;
import
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertUtils;
@@ -157,6 +158,14 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
@SerializedName("tprops")
private Map<String, String> targetProperties;
+ // The sampling window starts at the beginning of the sampling window.
+ // If the error rate exceeds `max_filter_ratio` within the window, the
sampling fails.
+ @Setter
+ private long sampleStartTime;
+ private long sampleWindowMs;
+ private long sampleWindowScannedRows;
+ private long sampleWindowFilteredRows;
+
public StreamingInsertJob(String jobName,
JobStatus jobStatus,
String dbName,
@@ -260,6 +269,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
try {
this.jobProperties = new StreamingJobProperties(properties);
jobProperties.validate();
+ this.sampleWindowMs = jobProperties.getMaxIntervalSecond() * 10 *
1000;
// build time definition
JobExecutionConfiguration execConfig = getJobConfig();
TimerDefinition timerDefinition = new TimerDefinition();
@@ -373,7 +383,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
// update target properties
if (!alterJobCommand.getTargetProperties().isEmpty()) {
-
this.sourceProperties.putAll(alterJobCommand.getTargetProperties());
+
this.targetProperties.putAll(alterJobCommand.getTargetProperties());
logParts.add("target properties: " +
alterJobCommand.getTargetProperties());
}
log.info("Alter streaming job {}, {}", getJobId(), String.join(", ",
logParts));
@@ -621,7 +631,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
this.jobStatistic = new StreamingJobStatistic();
}
this.jobStatistic.setScannedRows(this.jobStatistic.getScannedRows() +
offsetRequest.getScannedRows());
- this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() +
offsetRequest.getScannedBytes());
+ this.jobStatistic.setLoadBytes(this.jobStatistic.getLoadBytes() +
offsetRequest.getLoadBytes());
offsetProvider.updateOffset(offsetProvider.deserializeOffset(offsetRequest.getOffset()));
}
@@ -631,7 +641,9 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
this.nonTxnJobStatistic
.setScannedRows(this.nonTxnJobStatistic.getScannedRows() +
offsetRequest.getScannedRows());
-
this.nonTxnJobStatistic.setLoadBytes(this.nonTxnJobStatistic.getLoadBytes() +
offsetRequest.getScannedBytes());
+ this.nonTxnJobStatistic
+ .setFilteredRows(this.nonTxnJobStatistic.getFilteredRows() +
offsetRequest.getFilteredRows());
+
this.nonTxnJobStatistic.setLoadBytes(this.nonTxnJobStatistic.getLoadBytes() +
offsetRequest.getLoadBytes());
offsetProvider.updateOffset(offsetProvider.deserializeOffset(offsetRequest.getOffset()));
}
@@ -1104,6 +1116,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
throw new JobException("Unsupported commit offset for offset
provider type: "
+ offsetProvider.getClass().getSimpleName());
}
+
writeLock();
try {
if (this.runningStreamTask != null
@@ -1112,12 +1125,12 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
throw new JobException("Task id mismatch when commit
offset. expected: "
+ this.runningStreamTask.getTaskId() + ", actual:
" + offsetRequest.getTaskId());
}
+ checkDataQuality(offsetRequest);
updateNoTxnJobStatisticAndOffset(offsetRequest);
- if (offsetRequest.getScannedRows() == 0 &&
offsetRequest.getScannedBytes() == 0) {
+ if (offsetRequest.getScannedRows() == 0 &&
offsetRequest.getLoadBytes() == 0) {
JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider)
offsetProvider;
op.setHasMoreData(false);
}
-
persistOffsetProviderIfNeed();
log.info("Streaming multi table job {} task {} commit offset
successfully, offset: {}",
getJobId(), offsetRequest.getTaskId(),
offsetRequest.getOffset());
@@ -1129,6 +1142,55 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
}
+ /**
+ * Check data quality before commit offset
+ */
+ private void checkDataQuality(CommitOffsetRequest offsetRequest) throws
JobException {
+ String maxFilterRatioStr =
+ targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES +
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
+ if (maxFilterRatioStr == null) {
+ return;
+ }
+ Double maxFilterRatio = Double.parseDouble(maxFilterRatioStr.trim());
+ if (maxFilterRatio < 0 || maxFilterRatio > 1) {
+ log.warn("invalid max filter ratio {}, skip data quality check",
maxFilterRatio);
+ return;
+ }
+
+ this.sampleWindowScannedRows += offsetRequest.getScannedRows();
+ this.sampleWindowFilteredRows += offsetRequest.getFilteredRows();
+
+ if (sampleWindowScannedRows <= 0) {
+ return;
+ }
+
+ double ratio = (double) sampleWindowFilteredRows / (double)
sampleWindowScannedRows;
+
+ if (ratio > maxFilterRatio) {
+ String msg = String.format(
+ "data quality check failed for streaming multi table job
%d (within sample window): "
+ + "window filtered/scanned=%.6f >
maxFilterRatio=%.6f "
+ + "(windowFiltered=%d, windowScanned=%d)",
+ getJobId(), ratio, maxFilterRatio,
sampleWindowFilteredRows, sampleWindowScannedRows);
+ log.error(msg);
+ FailureReason failureReason = new
FailureReason(InternalErrorCode.TOO_MANY_FAILURE_ROWS_ERR,
+ "too many filtered rows exceeded max_filter_ratio " +
maxFilterRatio);
+ this.setFailureReason(failureReason);
+ this.updateJobStatus(JobStatus.PAUSED);
+ throw new JobException(failureReason.getMsg());
+ }
+
+ long now = System.currentTimeMillis();
+
+ if ((now - sampleStartTime) > sampleWindowMs) {
+ this.sampleStartTime = now;
+ this.sampleWindowScannedRows = 0L;
+ this.sampleWindowFilteredRows = 0L;
+ log.info("streaming multi table job {} enter next sample window,
startTime={}",
+ getJobId(), TimeUtils.longToTimeString(sampleStartTime));
+ }
+ }
+
private void persistOffsetProviderIfNeed() {
// only for jdbc
this.offsetProviderPersist = offsetProvider.getPersistInfo();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
index 942a25812db..8df18f1ee63 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobSchedulerTask.java
@@ -67,6 +67,7 @@ public class StreamingJobSchedulerTask extends AbstractTask {
}
streamingInsertJob.replayOffsetProviderIfNeed();
streamingInsertJob.createStreamingTask();
+ streamingInsertJob.setSampleStartTime(System.currentTimeMillis());
streamingInsertJob.updateJobStatus(JobStatus.RUNNING);
streamingInsertJob.setAutoResumeCount(0);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
index 71bf9e6f065..80d0da3de23 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingJobStatistic.java
@@ -38,6 +38,9 @@ public class StreamingJobStatistic {
@Getter
@Setter
private long fileSize;
+ @Getter
+ @Setter
+ private long filteredRows;
public String toJson() {
return new Gson().toJson(this);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
index 10364fe465f..0bdd6262864 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
@@ -23,8 +23,9 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
-import org.apache.doris.httpv2.rest.StreamingJobAction.CommitOffsetRequest;
import org.apache.doris.job.base.Job;
+import org.apache.doris.job.cdc.DataSourceConfigKeys;
+import org.apache.doris.job.cdc.request.CommitOffsetRequest;
import org.apache.doris.job.cdc.request.WriteRecordRequest;
import org.apache.doris.job.cdc.split.BinlogSplit;
import org.apache.doris.job.cdc.split.SnapshotSplit;
@@ -35,6 +36,7 @@ import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.job.offset.jdbc.JdbcOffset;
import org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider;
import org.apache.doris.job.util.StreamingJobUtils;
+import org.apache.doris.nereids.trees.plans.commands.LoadCommand;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
import org.apache.doris.rpc.BackendServiceProxy;
@@ -51,6 +53,7 @@ import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import lombok.Getter;
import lombok.extern.log4j.Log4j2;
+import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
import java.util.List;
@@ -69,13 +72,16 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
private String targetDb;
private StreamingJobProperties jobProperties;
private long scannedRows = 0L;
- private long scannedBytes = 0L;
+ private long loadBytes = 0L;
+ private long filteredRows = 0L;
+ private long loadedRows = 0L;
private long timeoutMs;
private long runningBackendId;
public StreamingMultiTblTask(Long jobId,
long taskId,
DataSourceType dataSourceType,
+
SourceOffsetProvider offsetProvider,
Map<String, String> sourceProperties,
String targetDb,
@@ -180,6 +186,9 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
request.setToken(getToken());
request.setTargetDb(targetDb);
+ Map<String, String> props = generateStreamLoadProps();
+ request.setStreamLoadProps(props);
+
Map<String, Object> splitMeta = offset.generateMeta();
Preconditions.checkArgument(!splitMeta.isEmpty(), "split meta is
empty");
request.setMeta(splitMeta);
@@ -189,6 +198,24 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
return request;
}
+ private Map<String, String> generateStreamLoadProps() {
+ Map<String, String> streamLoadProps = new HashMap<>();
+ String maxFilterRatio =
+ targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES +
LoadCommand.MAX_FILTER_RATIO_PROPERTY);
+
+ if (StringUtils.isNotEmpty(maxFilterRatio) &&
Double.parseDouble(maxFilterRatio) > 0) {
+ // If `load.max_filter_ratio` is set, it is calculated on the job
side based on a window;
+ // the `max_filter_ratio` of the streamload must be 1.
+ streamLoadProps.put(LoadCommand.MAX_FILTER_RATIO_PROPERTY, "1");
+ }
+
+ String strictMode =
targetProperties.get(DataSourceConfigKeys.LOAD_PROPERTIES +
LoadCommand.STRICT_MODE);
+ if (StringUtils.isNotEmpty(strictMode)) {
+ streamLoadProps.put(LoadCommand.STRICT_MODE, strictMode);
+ }
+ return streamLoadProps;
+ }
+
@Override
public boolean onSuccess() throws JobException {
if (getIsCanceled().get()) {
@@ -246,7 +273,9 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
}
this.scannedRows = offsetRequest.getScannedRows();
- this.scannedBytes = offsetRequest.getScannedBytes();
+ this.loadBytes = offsetRequest.getLoadBytes();
+ this.filteredRows = offsetRequest.getFilteredRows();
+ this.loadedRows = offsetRequest.getLoadedRows();
Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
if (null == job) {
log.info("job is null, job id is {}", jobId);
@@ -330,7 +359,7 @@ public class StreamingMultiTblTask extends
AbstractStreamingTask {
trow.addToColumnValue(new
TCell().setStringVal(FeConstants.null_string));
Map<String, Object> statistic = new HashMap<>();
statistic.put("scannedRows", scannedRows);
- statistic.put("loadBytes", scannedBytes);
+ statistic.put("loadBytes", loadBytes);
trow.addToColumnValue(new TCell().setStringVal(new
Gson().toJson(statistic)));
if (this.getUserIdentity() == null) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
index 4164dcaa262..1a1fb68fe82 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
@@ -70,7 +70,6 @@ import java.util.stream.Collectors;
@Log4j2
public class StreamingJobUtils {
- public static final String TABLE_PROPS_PREFIX = "table.create.properties.";
public static final String INTERNAL_STREAMING_JOB_META_TABLE_NAME =
"streaming_job_meta";
public static final String FULL_QUALIFIED_META_TBL_NAME =
InternalCatalog.INTERNAL_CATALOG_NAME
+ "." + FeConstants.INTERNAL_DB_NAME + "." +
INTERNAL_STREAMING_JOB_META_TABLE_NAME;
@@ -411,8 +410,8 @@ public class StreamingJobUtils {
private static Map<String, String> getTableCreateProperties(Map<String,
String> properties) {
final Map<String, String> tableCreateProps = new HashMap<>();
for (Map.Entry<String, String> entry : properties.entrySet()) {
- if (entry.getKey().startsWith(TABLE_PROPS_PREFIX)) {
- String subKey =
entry.getKey().substring(TABLE_PROPS_PREFIX.length());
+ if
(entry.getKey().startsWith(DataSourceConfigKeys.TABLE_PROPS_PREFIX)) {
+ String subKey =
entry.getKey().substring(DataSourceConfigKeys.TABLE_PROPS_PREFIX.length());
tableCreateProps.put(subKey, entry.getValue());
}
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 3f6438d6ee5..614c506619f 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -239,7 +239,6 @@ public class PipelineCoordinator {
SourceReader sourceReader =
Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
long scannedRows = 0L;
- long scannedBytes = 0L;
int heartbeatCount = 0;
SplitReadResult readResult = null;
try {
@@ -318,9 +317,7 @@ public class PipelineCoordinator {
String table = extractTable(element);
for (String record : serializedRecords) {
scannedRows++;
- byte[] dataBytes = record.getBytes();
- scannedBytes += dataBytes.length;
- batchStreamLoad.writeRecord(database, table,
dataBytes);
+ batchStreamLoad.writeRecord(database, table,
record.getBytes());
}
// Mark last message as data (not heartbeat)
lastMessageIsHeartbeat = false;
@@ -349,7 +346,8 @@ public class PipelineCoordinator {
// The offset must be reset before commitOffset to prevent the next
taskId from being create
// by the fe.
batchStreamLoad.resetTaskId();
- batchStreamLoad.commitOffset(currentTaskId, metaResponse, scannedRows,
scannedBytes);
+ batchStreamLoad.commitOffset(
+ currentTaskId, metaResponse, scannedRows,
batchStreamLoad.getLoadStatistic());
}
public static boolean isHeartbeatEvent(SourceRecord record) {
@@ -436,6 +434,8 @@ public class PipelineCoordinator {
batchStreamLoad.setCurrentTaskId(writeRecordRequest.getTaskId());
batchStreamLoad.setFrontendAddress(writeRecordRequest.getFrontendAddress());
batchStreamLoad.setToken(writeRecordRequest.getToken());
+ batchStreamLoad.setLoadProps(writeRecordRequest.getStreamLoadProps());
+ batchStreamLoad.getLoadStatistic().clear();
return batchStreamLoad;
}
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
index 926b49dcb45..92a2f9db2b6 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java
@@ -20,6 +20,7 @@ package org.apache.doris.cdcclient.sink;
import org.apache.doris.cdcclient.common.Env;
import org.apache.doris.cdcclient.exception.StreamLoadException;
import org.apache.doris.cdcclient.utils.HttpUtil;
+import org.apache.doris.job.cdc.request.CommitOffsetRequest;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
@@ -93,9 +94,13 @@ public class DorisBatchStreamLoad implements Serializable {
private String targetDb;
private long jobId;
@Setter private String token;
+ // stream load headers
+ @Setter private Map<String, String> loadProps = new HashMap<>();
+ @Getter private LoadStatistic loadStatistic;
public DorisBatchStreamLoad(long jobId, String targetDb) {
this.hostPort = Env.getCurrentEnv().getBackendHostPort();
+ this.loadStatistic = new LoadStatistic();
this.flushQueue = new LinkedBlockingDeque<>(1);
// maxBlockedBytes is two times of FLUSH_MAX_BYTE_SIZE
this.maxBlockedBytes = STREAM_LOAD_MAX_BYTES * 2;
@@ -388,11 +393,11 @@ public class DorisBatchStreamLoad implements Serializable
{
String finalLabel = String.format("%s_%s_%s", jobId,
currentTaskId, label);
putBuilder
.setUrl(loadUrl)
+ .addProperties(loadProps)
.addTokenAuth(token)
.setLabel(finalLabel)
.formatJson()
.addCommonHeader()
- .setEntity(entity)
.addHiddenColumns(true)
.setEntity(entity);
@@ -422,6 +427,7 @@ public class DorisBatchStreamLoad implements Serializable {
} finally {
lock.unlock();
}
+ loadStatistic.add(respContent);
return;
} else {
String errMsg = null;
@@ -494,16 +500,23 @@ public class DorisBatchStreamLoad implements Serializable
{
/** commit offfset to frontends. */
public void commitOffset(
- String taskId, List<Map<String, String>> meta, long scannedRows,
long scannedBytes) {
+ String taskId,
+ List<Map<String, String>> meta,
+ long scannedRows,
+ LoadStatistic loadStatistic) {
try {
String url = String.format(COMMIT_URL_PATTERN, frontendAddress,
targetDb);
- Map<String, Object> commitParams = new HashMap<>();
- commitParams.put("offset", OBJECT_MAPPER.writeValueAsString(meta));
- commitParams.put("jobId", jobId);
- commitParams.put("taskId", taskId);
- commitParams.put("scannedRows", scannedRows);
- commitParams.put("scannedBytes", scannedBytes);
- String param = OBJECT_MAPPER.writeValueAsString(commitParams);
+ CommitOffsetRequest commitRequest =
+ CommitOffsetRequest.builder()
+ .offset(OBJECT_MAPPER.writeValueAsString(meta))
+ .jobId(jobId)
+ .taskId(Long.parseLong(taskId))
+ .scannedRows(scannedRows)
+ .filteredRows(loadStatistic.getFilteredRows())
+ .loadedRows(loadStatistic.getLoadedRows())
+ .loadBytes(loadStatistic.getLoadBytes())
+ .build();
+ String param = OBJECT_MAPPER.writeValueAsString(commitRequest);
HttpPutBuilder builder =
new HttpPutBuilder()
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
index b5174d82798..3abd9eaabc2 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java
@@ -20,6 +20,7 @@ package org.apache.doris.cdcclient.sink;
import org.apache.doris.cdcclient.common.Constants;
import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.collections.MapUtils;
import org.apache.flink.util.Preconditions;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
@@ -29,7 +30,6 @@ import org.apache.http.entity.StringEntity;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
-import java.util.Properties;
/** Builder for HttpPut. */
public class HttpPutBuilder {
@@ -116,9 +116,10 @@ public class HttpPutBuilder {
return this;
}
- public HttpPutBuilder addProperties(Properties properties) {
- // TODO: check duplicate key.
- properties.forEach((key, value) -> header.put(String.valueOf(key),
String.valueOf(value)));
+ public HttpPutBuilder addProperties(Map<String, String> properties) {
+ if (MapUtils.isNotEmpty(properties)) {
+ header.putAll(properties);
+ }
return this;
}
diff --git
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/LoadStatistic.java
similarity index 62%
copy from
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
copy to
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/LoadStatistic.java
index f11539e6832..10f715c4307 100644
---
a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/LoadStatistic.java
@@ -15,24 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.job.cdc.request;
+package org.apache.doris.cdcclient.sink;
import lombok.Data;
-import lombok.EqualsAndHashCode;
@Data
-@EqualsAndHashCode(callSuper = true)
-public class FetchRecordRequest extends JobBaseRecordRequest {
- private boolean reload = true;
- private int fetchSize;
+public class LoadStatistic {
+ private long filteredRows = 0;
+ private long loadedRows = 0;
+ private long loadBytes = 0;
- @Override
- public boolean isReload() {
- return reload;
+ public void add(RespContent respContent) {
+ this.filteredRows += respContent.getNumberFilteredRows();
+ this.loadedRows += respContent.getNumberLoadedRows();
+ this.loadBytes += respContent.getLoadBytes();
}
- @Override
- public int getFetchSize() {
- return fetchSize;
+ public void clear() {
+ this.filteredRows = 0;
+ this.loadedRows = 0;
+ this.loadBytes = 0;
}
}
diff --git
a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.out
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.out
new file mode 100644
index 00000000000..7f44731e15d
--- /dev/null
+++
b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_snapshot_table1 --
+AB 123
+
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
index 2febce4cb36..29680f8506d 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
@@ -155,7 +155,9 @@ suite("test_streaming_mysql_job",
"p0,external,mysql,external_docker,external_do
select loadStatistic, status from jobs("type"="insert") where
Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
- assert jobInfo.get(0).get(0) ==
"{\"scannedRows\":7,\"loadBytes\":334,\"fileNumber\":0,\"fileSize\":0}"
+ def loadStat = parseJson(jobInfo.get(0).get(0))
+ assert loadStat.scannedRows == 7
+ assert loadStat.loadBytes == 338
assert jobInfo.get(0).get(1) == "RUNNING"
// mock mysql incremental into again
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
index b5451a899ef..b412c470558 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy
@@ -76,7 +76,7 @@ suite("test_streaming_mysql_job_create_alter",
"p0,external,mysql,external_docke
"table.create.properties1.replication_num" = "1"
)
"""
- exception "Only support target properties with prefix
table.create.properties"
+ exception "Not support target properties key
table.create.properties1.replication_num"
}
//error jdbc url format
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
index f063b629f90..a23e5efb0bf 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_errormsg.groovy
@@ -58,7 +58,8 @@ suite("test_streaming_mysql_job_errormsg",
"p0,external,mysql,external_docker,ex
`age` varchar(8) NOT NULL,
PRIMARY KEY (`name`)
) ENGINE=InnoDB"""
- sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES
('ABCDEFG', 'abc');"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES
('ABCDEFG1', 'abc');"""
+ sql """INSERT INTO ${mysqlDb}.${table1} (name, age) VALUES
('ABCDEFG2', '123');"""
}
sql """CREATE JOB ${jobName}
@@ -99,8 +100,50 @@ suite("test_streaming_mysql_job_errormsg",
"p0,external,mysql,external_docker,ex
def jobFailMsg = sql """select errorMsg from jobs("type"="insert")
where Name = '${jobName}' and ExecuteType='STREAMING'"""
log.info("jobFailMsg: " + jobFailMsg)
+ // stream load error: [DATA_QUALITY_ERROR]too many filtered rows
assert jobFailMsg.get(0).get(0).contains("stream load error")
+
+ // add max_filter_ratio to 1
+ sql """ALTER JOB ${jobName}
+ FROM MYSQL
+ TO DATABASE ${currentDb} (
+ "load.max_filter_ratio" = "1"
+ )"""
+
+ sql """RESUME JOB where jobname = '${jobName}'"""
+
+ // check job running
+ try {
+ Awaitility.await().atMost(300, SECONDS)
+ .pollInterval(1, SECONDS).until(
+ {
+ def jobSuccendCount = sql """ select SucceedTaskCount
from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
+ log.info("jobSuccendCount: " + jobSuccendCount)
+ // check job status and succeed task count larger than
1
+ jobSuccendCount.size() == 1 && '1' <=
jobSuccendCount.get(0).get(0)
+ }
+ )
+ } catch (Exception ex){
+ def showjob = sql """select * from jobs("type"="insert") where
Name='${jobName}'"""
+ def showtask = sql """select * from tasks("type"="insert") where
JobName='${jobName}'"""
+ log.info("show job: " + showjob)
+ log.info("show task: " + showtask)
+ throw ex;
+ }
+
+ def jobInfo = sql """
+ select loadStatistic, status from jobs("type"="insert") where
Name='${jobName}'
+ """
+ log.info("jobInfo: " + jobInfo)
+ def loadStat = parseJson(jobInfo.get(0).get(0));
+ assert loadStat.scannedRows == 2
+ assert loadStat.loadBytes == 115
+ assert loadStat.filteredRows == 1
+ assert jobInfo.get(0).get(1) == "RUNNING"
+
+ qt_select_snapshot_table1 """ SELECT * FROM ${table1} order by name
asc """
+
sql """
DROP JOB IF EXISTS where jobname = '${jobName}'
"""
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
index 8ece9f4ba74..80a18ee5a56 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
@@ -84,7 +84,7 @@ suite("test_streaming_mysql_job_restart_fe",
"docker,mysql,external_docker,exter
def jobSuccendCount = sql """ select
SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and
ExecuteType='STREAMING' """
log.info("jobSuccendCount: " + jobSuccendCount)
// check job status and succeed task count larger
than 2
- jobSuccendCount.size() == 1 && '1' <=
jobSuccendCount.get(0).get(0)
+ jobSuccendCount.size() == 1 && '2' <=
jobSuccendCount.get(0).get(0)
}
)
} catch (Exception ex){
@@ -99,7 +99,9 @@ suite("test_streaming_mysql_job_restart_fe",
"docker,mysql,external_docker,exter
select loadStatistic, status, currentOffset from
jobs("type"="insert") where Name='${jobName}'
"""
log.info("jobInfoBeforeRestart: " + jobInfoBeforeRestart)
- assert jobInfoBeforeRestart.get(0).get(0) ==
"{\"scannedRows\":2,\"loadBytes\":94,\"fileNumber\":0,\"fileSize\":0}"
+ def loadStatBefore = parseJson(jobInfoBeforeRestart.get(0).get(0))
+ assert loadStatBefore.scannedRows == 2
+ assert loadStatBefore.loadBytes == 95
assert jobInfoBeforeRestart.get(0).get(1) == "RUNNING"
// Restart FE
@@ -112,7 +114,9 @@ suite("test_streaming_mysql_job_restart_fe",
"docker,mysql,external_docker,exter
select loadStatistic, status, currentOffset from
jobs("type"="insert") where Name='${jobName}'
"""
log.info("jobAfterRestart: " + jobAfterRestart)
- assert jobAfterRestart.get(0).get(0) ==
"{\"scannedRows\":2,\"loadBytes\":94,\"fileNumber\":0,\"fileSize\":0}"
+ def loadStatAfter = parseJson(jobAfterRestart.get(0).get(0))
+ assert loadStatAfter.scannedRows == 2
+ assert loadStatAfter.loadBytes == 95
assert jobAfterRestart.get(0).get(1) == "RUNNING"
assert jobAfterRestart.get(0).get(2) ==
jobInfoBeforeRestart.get(0).get(2)
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
index 7fe8cb73daa..ba2c1247016 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
@@ -148,7 +148,9 @@ suite("test_streaming_postgres_job",
"p0,external,pg,external_docker,external_do
select loadStatistic, status from jobs("type"="insert") where
Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
- assert jobInfo.get(0).get(0) ==
"{\"scannedRows\":7,\"loadBytes\":337,\"fileNumber\":0,\"fileSize\":0}"
+ def loadStat = parseJson(jobInfo.get(0).get(0))
+ assert loadStat.scannedRows == 7
+ assert loadStat.loadBytes == 341
assert jobInfo.get(0).get(1) == "RUNNING"
// mock incremental into again
diff --git
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_split.groovy
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_split.groovy
index ac299223101..5e01bfe58b5 100644
---
a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_split.groovy
+++
b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_split.groovy
@@ -103,8 +103,10 @@ suite("test_streaming_postgres_job_split",
"p0,external,pg,external_docker,exter
select loadStatistic, status from jobs("type"="insert") where
Name='${jobName}'
"""
log.info("jobInfo: " + jobInfo)
- assert jobInfo.get(0).get(0) ==
"{\"scannedRows\":5,\"loadBytes\":270,\"fileNumber\":0,\"fileSize\":0}"
-
+ def loadStat = parseJson(jobInfo.get(0).get(0))
+ assert loadStat.scannedRows == 5
+ assert loadStat.loadBytes == 273
+
sql """
DROP JOB IF EXISTS where jobname = '${jobName}'
"""
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
index 02e86b54325..d8058ba5fbc 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job.groovy
@@ -108,7 +108,11 @@ suite("test_streaming_insert_job") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"
+ def loadStat = parseJson(jobInfo.get(0).get(2))
+ assert loadStat.scannedRows == 20
+ assert loadStat.loadBytes == 425
+ assert loadStat.fileNumber == 2
+ assert loadStat.fileSize == 256
def showTask = sql """select * from tasks("type"="insert") where
jobName='${jobName}'"""
log.info("showTask is : " + showTask )
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
index a9439095a64..85d494ca942 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_insert_job_offset.groovy
@@ -173,7 +173,11 @@ suite("test_streaming_insert_job_offset") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":1,\"fileSize\":138}"
+ def loadStat = parseJson(jobInfo.get(0).get(2))
+ assert loadStat.scannedRows == 10
+ assert loadStat.loadBytes == 218
+ assert loadStat.fileNumber == 1
+ assert loadStat.fileSize == 138
assert jobInfo.get(0).get(3) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/example_0.csv\\\"}\"}"
// alter job init offset, Lexicographic order includes example_[0-1]
@@ -211,7 +215,11 @@ suite("test_streaming_insert_job_offset") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":3,\"fileSize\":394}"
+ def loadStat2 = parseJson(jobInfo.get(0).get(2))
+ assert loadStat2.scannedRows == 30
+ assert loadStat2.loadBytes == 643
+ assert loadStat2.fileNumber == 3
+ assert loadStat2.fileSize == 394
assert jobInfo.get(0).get(3) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
// has double example_1.csv and example_0.csv data
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
index 02d44f80a97..520e5b75991 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_alter_offset_restart_fe.groovy
@@ -93,7 +93,11 @@ suite("test_streaming_job_alter_offset_restart_fe",
"docker") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":1,\"fileSize\":138}"
+ def loadStat = parseJson(jobInfo.get(0).get(2))
+ assert loadStat.scannedRows == 10
+ assert loadStat.loadBytes == 218
+ assert loadStat.fileNumber == 1
+ assert loadStat.fileSize == 138
sql """
PAUSE JOB where jobname = '${jobName}'
@@ -112,7 +116,11 @@ suite("test_streaming_job_alter_offset_restart_fe",
"docker") {
"""
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/anoexist1234.csv\"}";
- assert jobInfo.get(0).get(1) ==
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":1,\"fileSize\":138}"
+ def loadStat1 = parseJson(jobInfo.get(0).get(1))
+ assert loadStat1.scannedRows == 10
+ assert loadStat1.loadBytes == 218
+ assert loadStat1.fileNumber == 1
+ assert loadStat1.fileSize == 138
assert jobInfo.get(0).get(2) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
// Restart FE
@@ -131,7 +139,11 @@ suite("test_streaming_job_alter_offset_restart_fe",
"docker") {
"""
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/anoexist1234.csv\"}";
- assert jobInfo.get(0).get(1) ==
"{\"scannedRows\":10,\"loadBytes\":218,\"fileNumber\":1,\"fileSize\":138}"
+ def loadStat2 = parseJson(jobInfo.get(0).get(1))
+ assert loadStat2.scannedRows == 10
+ assert loadStat2.loadBytes == 218
+ assert loadStat2.fileNumber == 1
+ assert loadStat2.fileSize == 138
assert jobInfo.get(0).get(2) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
// resume to check whether consumption will resume
@@ -163,7 +175,11 @@ suite("test_streaming_job_alter_offset_restart_fe",
"docker") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":30,\"loadBytes\":643,\"fileNumber\":3,\"fileSize\":394}"
+ def loadStat3 = parseJson(jobInfo.get(0).get(2))
+ assert loadStat3.scannedRows == 30
+ assert loadStat3.loadBytes == 643
+ assert loadStat3.fileNumber == 3
+ assert loadStat3.fileSize == 394
assert jobInfo.get(0).get(3) ==
"{\"offset\":\"{\\\"fileName\\\":\\\"regression/load/data/anoexist1234.csv\\\"}\"}"
sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """
diff --git
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
index 11d2113ce5d..ae03afb47a4 100644
---
a/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
+++
b/regression-test/suites/job_p0/streaming_job/test_streaming_job_restart_fe.groovy
@@ -97,7 +97,11 @@ suite("test_streaming_job_restart_fe", "docker") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"
+ def loadStat = parseJson(jobInfo.get(0).get(2))
+ assert loadStat.scannedRows == 20
+ assert loadStat.loadBytes == 425
+ assert loadStat.fileNumber == 2
+ assert loadStat.fileSize == 256
// Restart FE
cluster.restartFrontends()
@@ -115,7 +119,11 @@ suite("test_streaming_job_restart_fe", "docker") {
log.info("jobInfo: " + jobInfo)
assert jobInfo.get(0).get(0) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
assert jobInfo.get(0).get(1) ==
"{\"fileName\":\"regression/load/data/example_1.csv\"}";
- assert jobInfo.get(0).get(2) ==
"{\"scannedRows\":20,\"loadBytes\":425,\"fileNumber\":2,\"fileSize\":256}"
+ def loadStatAfter = parseJson(jobInfo.get(0).get(2))
+ assert loadStatAfter.scannedRows == 20
+ assert loadStatAfter.loadBytes == 425
+ assert loadStatAfter.fileNumber == 2
+ assert loadStatAfter.fileSize == 256
sql """ DROP JOB IF EXISTS where jobname = '${jobName}' """
sql """drop table if exists `${tableName}` force"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]