This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 4bd7ba657df7524550c25e572d7022da93233cd3
Author: Siyang Tang <[email protected]>
AuthorDate: Tue Aug 22 20:08:19 2023 +0800

    [fix](show) show load warning support load v2 (#22759)
---
 .../java/org/apache/doris/qe/ShowExecutor.java     | 41 +++++++++++-
 .../load_p0/broker_load/test_etl_failed.groovy     | 78 ++++++++++++++++++++++
 2 files changed, 118 insertions(+), 1 deletion(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
index 6c14eb2e6d..9ec777675b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ShowExecutor.java
@@ -184,6 +184,7 @@ import org.apache.doris.load.ExportMgr;
 import org.apache.doris.load.Load;
 import org.apache.doris.load.LoadJob;
 import org.apache.doris.load.LoadJob.JobState;
+import org.apache.doris.load.loadv2.LoadManager;
 import org.apache.doris.load.routineload.RoutineLoadJob;
 import org.apache.doris.mtmv.MTMVJobManager;
 import org.apache.doris.mtmv.metadata.MTMVJob;
@@ -1291,6 +1292,11 @@ public class ShowExecutor {
         }
 
         Database db = 
env.getInternalCatalog().getDbOrAnalysisException(showWarningsStmt.getDbName());
+        ShowResultSet showResultSet = 
handleShowLoadWarningV2(showWarningsStmt, db);
+        if (showResultSet != null) {
+            resultSet = showResultSet;
+            return;
+        }
 
         long dbId = db.getId();
         Load load = env.getLoadInstance();
@@ -1298,7 +1304,6 @@ public class ShowExecutor {
         LoadJob job = null;
         String label = null;
         if (showWarningsStmt.isFindByLabel()) {
-            label = showWarningsStmt.getLabel();
             jobId = load.getLatestJobIdByLabel(dbId, 
showWarningsStmt.getLabel());
             job = load.getLoadJob(jobId);
             if (job == null) {
@@ -1343,6 +1348,40 @@ public class ShowExecutor {
         resultSet = new ShowResultSet(showWarningsStmt.getMetaData(), rows);
     }
 
+    private ShowResultSet handleShowLoadWarningV2(ShowLoadWarningsStmt 
showWarningsStmt, Database db)
+            throws AnalysisException {
+        LoadManager loadManager = Env.getCurrentEnv().getLoadManager();
+        if (showWarningsStmt.isFindByLabel()) {
+            List<List<Comparable>> loadJobInfosByDb = 
loadManager.getLoadJobInfosByDb(db.getId(),
+                    showWarningsStmt.getLabel(),
+                    true, null);
+            if (CollectionUtils.isEmpty(loadJobInfosByDb)) {
+                return null;
+            }
+            List<List<String>> infoList = 
Lists.newArrayListWithCapacity(loadJobInfosByDb.size());
+            for (List<Comparable> comparables : loadJobInfosByDb) {
+                List<String> singleInfo = 
comparables.stream().map(Object::toString).collect(Collectors.toList());
+                infoList.add(singleInfo);
+            }
+            return new ShowResultSet(showWarningsStmt.getMetaData(), infoList);
+        }
+        org.apache.doris.load.loadv2.LoadJob loadJob = 
loadManager.getLoadJob(showWarningsStmt.getJobId());
+        if (loadJob == null) {
+            return null;
+        }
+        List<String> singleInfo;
+        try {
+            singleInfo = loadJob
+                    .getShowInfo()
+                    .stream()
+                    .map(Objects::toString)
+                    .collect(Collectors.toList());
+        } catch (DdlException e) {
+            throw new AnalysisException(e.getMessage());
+        }
+        return new ShowResultSet(showWarningsStmt.getMetaData(), 
Lists.newArrayList(Collections.singleton(singleInfo)));
+    }
+
     private void handleShowLoadWarningsFromURL(ShowLoadWarningsStmt 
showWarningsStmt, URL url)
             throws AnalysisException {
         String host = url.getHost();
diff --git a/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy 
b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy
new file mode 100644
index 0000000000..3b4eac874d
--- /dev/null
+++ b/regression-test/suites/load_p0/broker_load/test_etl_failed.groovy
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_etl_failed", "load_p0") {
+    def tableName = "test_etl_failed"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k1` int(20) NULL,
+            `k2` bigint(20) NULL,
+            `v1` tinyint(4)  NULL,
+            `v2` string  NULL,
+            `v3` date NOT NULL,
+            `v4` datetime NOT NULL
+        ) ENGINE=OLAP
+        DUPLICATE KEY(`k1`, `k2`)
+        COMMENT 'OLAP'
+        DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 3
+        PROPERTIES ("replication_allocation" = "tag.location.default: 1"); 
+    """
+    String label = "test_etl_failed"
+    String path = 
"s3://doris-build-1308700295/regression/load/data/etl_failure/etl-failure.csv"
+    String format = "CSV"
+    String ak = getS3AK()
+    String sk = getS3SK()
+    sql """
+        LOAD LABEL ${label} (
+                DATA INFILE("$path")
+                INTO TABLE ${tableName}
+                FORMAT AS ${format}
+            )
+            WITH S3 (
+                "AWS_ACCESS_KEY" = "$ak",
+                "AWS_SECRET_KEY" = "$sk",
+                "AWS_ENDPOINT" = "cos.ap-beijing.myqcloud.com",
+                "AWS_REGION" = "ap-beijing"
+            )
+            PROPERTIES(
+                "use_new_load_scan_node" = "true",
+                "max_filter_ratio" = "0.1"
+            );
+    """
+
+    def max_try_milli_secs = 600000
+    while (max_try_milli_secs > 0) {
+        String[][] result = sql """ show load where label="$label" order by 
createtime desc limit 1; """
+        if (result[0][2].equals("FINISHED")) {
+            logger.info("Load FINISHED " + label)
+            assertTrue(1 == 2, "etl should be failed")
+            break;
+        }
+        if (result[0][2].equals("CANCELLED")) {
+            break;
+        }
+        Thread.sleep(1000)
+        max_try_milli_secs -= 1000
+        if(max_try_milli_secs <= 0) {
+            assertTrue(1 == 2, "load Timeout: $label")
+        }
+    }
+    String[][] result = sql """ show load warnings where label="$label" """
+    assertTrue(result[0].size() > 1, "warning show be not null")
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to