This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4bd7ba657df7524550c25e572d7022da93233cd3 Author: Siyang Tang <[email protected]> AuthorDate: Tue Aug 22 20:08:19 2023 +0800 [fix](show) show load warning support load v2 (#22759) --- .../java/org/apache/doris/qe/ShowExecutor.java | 41 +++++++++++- .../load_p0/broker_load/test_etl_failed.groovy | 78 ++++++++++++++++++++++ 2 files changed, 118 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java index 6c14eb2e6d..9ec777675b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java @@ -184,6 +184,7 @@ import org.apache.doris.load.ExportMgr; import org.apache.doris.load.Load; import org.apache.doris.load.LoadJob; import org.apache.doris.load.LoadJob.JobState; +import org.apache.doris.load.loadv2.LoadManager; import org.apache.doris.load.routineload.RoutineLoadJob; import org.apache.doris.mtmv.MTMVJobManager; import org.apache.doris.mtmv.metadata.MTMVJob; @@ -1291,6 +1292,11 @@ public class ShowExecutor { } Database db = env.getInternalCatalog().getDbOrAnalysisException(showWarningsStmt.getDbName()); + ShowResultSet showResultSet = handleShowLoadWarningV2(showWarningsStmt, db); + if (showResultSet != null) { + resultSet = showResultSet; + return; + } long dbId = db.getId(); Load load = env.getLoadInstance(); @@ -1298,7 +1304,6 @@ public class ShowExecutor { LoadJob job = null; String label = null; if (showWarningsStmt.isFindByLabel()) { - label = showWarningsStmt.getLabel(); jobId = load.getLatestJobIdByLabel(dbId, showWarningsStmt.getLabel()); job = load.getLoadJob(jobId); if (job == null) { @@ -1343,6 +1348,40 @@ public class ShowExecutor { resultSet = new ShowResultSet(showWarningsStmt.getMetaData(), rows); } + private ShowResultSet handleShowLoadWarningV2(ShowLoadWarningsStmt showWarningsStmt, Database db) + throws AnalysisException { + LoadManager loadManager = Env.getCurrentEnv().getLoadManager(); + if (showWarningsStmt.isFindByLabel()) { + List<List<Comparable>> loadJobInfosByDb = loadManager.getLoadJobInfosByDb(db.getId(), + showWarningsStmt.getLabel(), + true, null); + if (CollectionUtils.isEmpty(loadJobInfosByDb)) { + return null; + } + List<List<String>> infoList = Lists.newArrayListWithCapacity(loadJobInfosByDb.size()); + for (List<Comparable> comparables : loadJobInfosByDb) { + List<String> singleInfo = comparables.stream().map(Object::toString).collect(Collectors.toList()); + infoList.add(singleInfo); + } + return new ShowResultSet(showWarningsStmt.getMetaData(), infoList); + } + org.apache.doris.load.loadv2.LoadJob loadJob = loadManager.getLoadJob(showWarningsStmt.getJobId()); + if (loadJob == null) { + return null; + } + List<String> singleInfo; + try { + singleInfo = loadJob + .getShowInfo() + .stream() + .map(Objects::toString) + .collect(Collectors.toList()); + } catch (DdlException e) { + throw new AnalysisException(e.getMessage()); + } + return new ShowResultSet(showWarningsStmt.getMetaData(), Lists.newArrayList(Collections.singleton(singleInfo))); + } + private void handleShowLoadWarningsFromURL(ShowLoadWarningsStmt showWarningsStmt, URL url) throws AnalysisException { String host = url.getHost(); diff --git a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy new file mode 100644 index 0000000000..3b4eac874d --- /dev/null +++ b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_etl_failed", "load_p0") { + def tableName = "test_etl_failed" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k1` int(20) NULL, + `k2` bigint(20) NULL, + `v1` tinyint(4) NULL, + `v2` string NULL, + `v3` date NOT NULL, + `v4` datetime NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`k1`, `k2`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3 + PROPERTIES ("replication_allocation" = "tag.location.default: 1"); + """ + String label = "test_etl_failed" + String path = "s3://doris-build-1308700295/regression/load/data/etl_failure/etl-failure.csv" + String format = "CSV" + String ak = getS3AK() + String sk = getS3SK() + sql """ + LOAD LABEL ${label} ( + DATA INFILE("$path") + INTO TABLE ${tableName} + FORMAT AS ${format} + ) + WITH S3 ( + "AWS_ACCESS_KEY" = "$ak", + "AWS_SECRET_KEY" = "$sk", + "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com", + "AWS_REGION" = "ap-beijing" + ) + PROPERTIES( + "use_new_load_scan_node" = "true", + "max_filter_ratio" = "0.1" + ); + """ + + def max_try_milli_secs = 600000 + while (max_try_milli_secs > 0) { + String[][] result = sql """ show load where label="$label" order by createtime desc limit 1; """ + if (result[0][2].equals("FINISHED")) { + logger.info("Load FINISHED " + label) + assertTrue(1 == 2, "etl should be failed") + break; + } + if (result[0][2].equals("CANCELLED")) { + break; + } + Thread.sleep(1000) + max_try_milli_secs -= 1000 + if(max_try_milli_secs <= 0) { + assertTrue(1 == 2, "load Timeout: $label") + } + } + String[][] result = sql """ show load warnings where label="$label" """ + assertTrue(result[0].size() > 1, "warning show be not null") +} + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
