This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 82940a4  [Spark Load] Fix spark load bugs (#4464)
82940a4 is described below

commit 82940a4905b6b578af943b1ea0663bc30aa933a6
Author: wyb <[email protected]>
AuthorDate: Thu Aug 27 23:40:33 2020 +0800

    [Spark Load] Fix spark load bugs (#4464)
    
    1. fix write dpp result when dpp throw exception
    2. boolean value:true, false(IgnoreCase), 0, 1
    3. wrong dest column for source data check
    4. support * in source file path
    5. if job state is cancelled or finished, submitPushTasks would throw all 
partitions have no load data exception,
        because tableToLoadPartitions was already cleaned up
    
    #3433
---
 .../doris/load/loadv2/SparkEtlJobHandler.java      |  8 ++-
 .../org/apache/doris/load/loadv2/SparkLoadJob.java | 12 ++++
 .../apache/doris/load/loadv2/SparkLoadJobTest.java | 16 +++++
 .../apache/doris/load/loadv2/dpp/ColumnParser.java |  3 +-
 .../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 80 +++++++++++++---------
 .../apache/doris/load/loadv2/etl/EtlJobConfig.java |  5 +-
 6 files changed, 87 insertions(+), 37 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index 1d9eedc..96b7064 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -239,9 +239,11 @@ public class SparkEtlJobHandler {
                 byte[] data = BrokerUtil.readFile(dppResultFilePath, 
brokerDesc);
                 String dppResultStr = new String(data, "UTF-8");
                 DppResult dppResult = new Gson().fromJson(dppResultStr, 
DppResult.class);
-                status.setDppResult(dppResult);
-                if (status.getState() == TEtlState.CANCELLED && 
!Strings.isNullOrEmpty(dppResult.failedReason)) {
-                    status.setFailMsg(dppResult.failedReason);
+                if (dppResult != null) {
+                    status.setDppResult(dppResult);
+                    if (status.getState() == TEtlState.CANCELLED && 
!Strings.isNullOrEmpty(dppResult.failedReason)) {
+                        status.setFailMsg(dppResult.failedReason);
+                    }
                 }
             } catch (UserException | JsonSyntaxException | 
UnsupportedEncodingException e) {
                 LOG.warn("read broker file failed. path: {}", 
dppResultFilePath, e);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 6576be2..d18416d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -412,6 +412,14 @@ public class SparkLoadJob extends BulkLoadJob {
         try {
             writeLock();
             try {
+                // check state is still loading. If state is cancelled or 
finished, return.
+                // if state is cancelled or finished and not return, this 
would throw all partitions have no load data exception,
+                // because tableToLoadPartitions was already cleaned up,
+                if (state != JobState.LOADING) {
+                    LOG.warn("job state is not loading. job id: {}, state: 
{}", id, state);
+                    return totalTablets;
+                }
+
                 for (Map.Entry<Long, Set<Long>> entry : 
tableToLoadPartitions.entrySet()) {
                     long tableId = entry.getKey();
                     OlapTable table = (OlapTable) db.getTable(tableId);
@@ -565,6 +573,10 @@ public class SparkLoadJob extends BulkLoadJob {
 
         // submit push tasks
         Set<Long> totalTablets = submitPushTasks();
+        if (totalTablets.isEmpty()) {
+            LOG.warn("total tablets set is empty. job id: {}, state: {}", id, 
state);
+            return;
+        }
 
         // update status
         boolean canCommitJob = false;
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
index e86cad0..57bbd9a 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -415,6 +415,22 @@ public class SparkLoadJobTest {
     }
 
     @Test
+    public void testSubmitTasksWhenStateFinished(@Mocked Catalog catalog, 
@Injectable String originStmt,
+                                                 @Injectable Database db) 
throws Exception {
+        new Expectations() {
+            {
+                catalog.getDb(dbId);
+                result = db;
+            }
+        };
+
+        SparkLoadJob job = getEtlStateJob(originStmt);
+        job.state = JobState.FINISHED;
+        Set<Long> totalTablets = Deencapsulation.invoke(job, 
"submitPushTasks");
+        Assert.assertTrue(totalTablets.isEmpty());
+    }
+
+    @Test
     public void testStateUpdateInfoPersist() throws IOException {
         String fileName = "./testStateUpdateInfoPersistFile";
         File file = new File(fileName);
diff --git 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
index 0b91cb0..d5e0cee 100644
--- 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
+++ 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
@@ -138,7 +138,8 @@ class BooleanParser extends ColumnParser {
     @Override
     public boolean parse(String value) {
         if (value.equalsIgnoreCase("true")
-                || value.equalsIgnoreCase("false")) {
+                || value.equalsIgnoreCase("false")
+                || value.equals("0") || value.equals("1")) {
             return true;
         }
         return false;
diff --git 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 53c9111..bc0a6cc 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -105,6 +105,7 @@ public final class SparkDpp implements java.io.Serializable 
{
     // because hadoop configuration is not serializable,
     // we need to wrap it so that we can use it in executor.
     private SerializableConfiguration serializableHadoopConf;
+    private DppResult dppResult = new DppResult();
 
 
     public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) {
@@ -485,7 +486,8 @@ public final class SparkDpp implements java.io.Serializable 
{
                 dataframe = dataframe.withColumn(dstField.name(), 
dataframe.col(dstField.name()).cast("date"));
             } else if (column.columnType.equalsIgnoreCase("BOOLEAN")) {
                 dataframe = dataframe.withColumn(dstField.name(),
-                        
functions.when(dataframe.col(dstField.name()).equalTo("true"), "1")
+                        
functions.when(functions.lower(dataframe.col(dstField.name())).equalTo("true"), 
"1")
+                                
.when(dataframe.col(dstField.name()).equalTo("1"), "1")
                                 .otherwise("0"));
             } else if (!column.columnType.equalsIgnoreCase(BITMAP_TYPE) && 
!dstField.dataType().equals(DataTypes.StringType)) {
                 dataframe = dataframe.withColumn(dstField.name(), 
dataframe.col(dstField.name()).cast(dstField.dataType()));
@@ -535,9 +537,10 @@ public final class SparkDpp implements 
java.io.Serializable {
                 dataSrcColumns.add(column.columnName);
             }
         }
-        List<String> dstTableNames = new ArrayList<>();
-        for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
-            dstTableNames.add(column.columnName);
+        // for getting schema to check source data
+        Map<String, Integer> dstColumnNameToIndex = new HashMap<String, 
Integer>();
+        for (int i = 0; i < baseIndex.columns.size(); i++) {
+            dstColumnNameToIndex.put(baseIndex.columns.get(i).columnName, i);
         }
         List<String> srcColumnsWithColumnsFromPath = new ArrayList<>();
         srcColumnsWithColumnsFromPath.addAll(dataSrcColumns);
@@ -566,25 +569,28 @@ public final class SparkDpp implements 
java.io.Serializable {
                         validRow = false;
                     } else {
                         for (int i = 0; i < attributes.length; ++i) {
-                            if (attributes[i].equals(NULL_FLAG)) {
-                                if (baseIndex.columns.get(i).isAllowNull) {
+                            StructField field = srcSchema.apply(i);
+                            String srcColumnName = field.name();
+                            if (attributes[i].equals(NULL_FLAG) && 
dstColumnNameToIndex.containsKey(srcColumnName)) {
+                                if 
(baseIndex.columns.get(dstColumnNameToIndex.get(srcColumnName)).isAllowNull) {
                                     attributes[i] = null;
                                 } else {
-                                    LOG.warn("colunm:" + i + " can not be 
null. row:" + record);
+                                    LOG.warn("column name:" + srcColumnName + 
", attribute: " + i
+                                            + " can not be null. row:" + 
record);
                                     validRow = false;
                                     break;
                                 }
                             }
-                            boolean isStrictMode = (boolean) 
etlJobConfig.properties.strictMode;
+                            boolean isStrictMode = 
etlJobConfig.properties.strictMode;
                             if (isStrictMode) {
-                                StructField field = srcSchema.apply(i);
-                                if (dstTableNames.contains(field.name())) {
-                                    String type = columns.get(i).columnType;
+                                if 
(dstColumnNameToIndex.containsKey(srcColumnName)) {
+                                    int index = 
dstColumnNameToIndex.get(srcColumnName);
+                                    String type = 
columns.get(index).columnType;
                                     if (type.equalsIgnoreCase("CHAR")
                                             || 
type.equalsIgnoreCase("VARCHAR")) {
                                         continue;
                                     }
-                                    ColumnParser parser = parsers.get(i);
+                                    ColumnParser parser = parsers.get(index);
                                     boolean valid = 
parser.parse(attributes[i]);
                                     if (!valid) {
                                         validRow = false;
@@ -726,12 +732,20 @@ public final class SparkDpp implements 
java.io.Serializable {
             throws SparkDppException, IOException, URISyntaxException {
         Dataset<Row> fileGroupDataframe = null;
         for (String filePath : filePaths) {
-            fileNumberAcc.add(1);
             try {
                 URI uri = new URI(filePath);
                 FileSystem fs = FileSystem.get(uri, 
serializableHadoopConf.value());
-                FileStatus fileStatus = fs.getFileStatus(new Path(filePath));
-                fileSizeAcc.add(fileStatus.getLen());
+                FileStatus[] fileStatuses = fs.globStatus(new Path(filePath));
+                if (fileStatuses == null) {
+                    throw new SparkDppException("fs list status failed: " + 
filePath);
+                }
+                for (FileStatus fileStatus : fileStatuses) {
+                    if (fileStatus.isDirectory()) {
+                        continue;
+                    }
+                    fileNumberAcc.add(1);
+                    fileSizeAcc.add(fileStatus.getLen());
+                }
             } catch (Exception e) {
                 LOG.warn("parse path failed:" + filePath);
                 throw e;
@@ -770,8 +784,7 @@ public final class SparkDpp implements java.io.Serializable 
{
         return dataframe;
     }
 
-    private DppResult process() throws Exception {
-        DppResult dppResult = new DppResult();
+    private void process() throws Exception {
         try {
             for (Map.Entry<Long, EtlJobConfig.EtlTable> entry : 
etlJobConfig.tables.entrySet()) {
                 Long tableId = entry.getKey();
@@ -852,34 +865,26 @@ public final class SparkDpp implements 
java.io.Serializable {
                 }
                 processRollupTree(rootNode, tablePairRDD, tableId, baseIndex);
             }
-            spark.stop();
+            LOG.info("invalid rows contents:" + invalidRows.value());
+            dppResult.isSuccess = true;
+            dppResult.failedReason = "";
         } catch (Exception exception) {
             LOG.warn("spark dpp failed for exception:" + exception);
             dppResult.isSuccess = false;
             dppResult.failedReason = exception.getMessage();
+            throw exception;
+        } finally {
+            spark.stop();
             dppResult.normalRows = scannedRowsAcc.value() - 
abnormalRowAcc.value();
             dppResult.scannedRows = scannedRowsAcc.value();
             dppResult.fileNumber = fileNumberAcc.value();
             dppResult.fileSize = fileSizeAcc.value();
             dppResult.abnormalRows = abnormalRowAcc.value();
             dppResult.partialAbnormalRows = invalidRows.value();
-            throw exception;
         }
-        LOG.info("invalid rows contents:" + invalidRows.value());
-        dppResult.isSuccess = true;
-        dppResult.failedReason = "";
-        dppResult.normalRows = scannedRowsAcc.value() - abnormalRowAcc.value();
-        dppResult.scannedRows = scannedRowsAcc.value();
-        dppResult.fileNumber = fileNumberAcc.value();
-        dppResult.fileSize = fileSizeAcc.value();
-        dppResult.abnormalRows = abnormalRowAcc.value();
-        dppResult.partialAbnormalRows = invalidRows.value();
-        return dppResult;
     }
 
-    public void doDpp() throws Exception {
-        // write dpp result to output
-        DppResult dppResult = process();
+    private void writeDppResult(DppResult dppResult) throws Exception {
         String outputPath = etlJobConfig.getOutputPath();
         String resultFilePath = outputPath + "/" + DPP_RESULT_FILE;
         URI uri = new URI(outputPath);
@@ -891,5 +896,16 @@ public final class SparkDpp implements 
java.io.Serializable {
         outputStream.write('\n');
         outputStream.close();
     }
+
+    public void doDpp() throws Exception {
+        try {
+            process();
+        } catch (Exception e) {
+            throw e;
+        } finally {
+            // write dpp result to file in outputPath
+            writeDppResult(dppResult);
+        }
+    }
 }
 
diff --git 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
index 9dec697..fe81aeb 100644
--- 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
+++ 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
@@ -111,7 +111,10 @@ import java.util.Map;
                 },
                 "where": "k2 > 10",
                 "isNegative": false,
-                "hiveTableName": "hive_db.table"
+                "hiveDbTableName": "hive_db.table",
+                "hiveTableProperties": {
+                    "hive.metastore.uris": "thrift://host:port"
+                }
             }]
         }
     },


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to