This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 834834d  [SparkLoadk] Avoid to read whole hive table when we add a 
where (#5047)
834834d is described below

commit 834834dc44adc2e300ada3ad310b93ad3afab0b4
Author: Dam1029 <[email protected]>
AuthorDate: Tue Dec 15 09:26:42 2020 +0800

    [SparkLoadk] Avoid to read whole hive table when we add a where (#5047)
    
    When we use spark load from hive table, the function loadDataFromHiveTable
    will read whole hive table and then filter the data in process()
    if hive table have lots of partitions and history data,the load will be 
cost too much time and resource.
    So we can do filter work in loadDataFromHiveTable function when read from 
hive table.
    Co-authored-by: 杜安明 <[email protected]>
---
 .../main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java | 12 ++++--------
 1 file changed, 4 insertions(+), 8 deletions(-)

diff --git 
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java 
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index e4c3a23..2af4de4 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -97,7 +97,6 @@ public final class SparkDpp implements java.io.Serializable {
     private SparkSession spark = null;
     private EtlJobConfig etlJobConfig = null;
     private LongAccumulator abnormalRowAcc = null;
-    private LongAccumulator unselectedRowAcc = null;
     private LongAccumulator scannedRowsAcc = null;
     private LongAccumulator fileNumberAcc = null;
     private LongAccumulator fileSizeAcc = null;
@@ -120,7 +119,6 @@ public final class SparkDpp implements java.io.Serializable 
{
 
     public void init() {
         abnormalRowAcc = 
spark.sparkContext().longAccumulator("abnormalRowAcc");
-        unselectedRowAcc = 
spark.sparkContext().longAccumulator("unselectedRowAcc");
         scannedRowsAcc = 
spark.sparkContext().longAccumulator("scannedRowsAcc");
         fileNumberAcc = spark.sparkContext().longAccumulator("fileNumberAcc");
         fileSizeAcc = spark.sparkContext().longAccumulator("fileSizeAcc");
@@ -854,6 +852,10 @@ public final class SparkDpp implements 
java.io.Serializable {
             sql.append(column.columnName).append(",");
         });
         sql.deleteCharAt(sql.length() - 1).append(" from 
").append(hiveDbTableName);
+        if (!Strings.isNullOrEmpty(fileGroup.where)) {
+            sql.append(" where ").append(fileGroup.where);
+        }
+
         Dataset<Row> dataframe = spark.sql(sql.toString());
         dataframe = checkDataFromHiveWithStrictMode(dataframe, baseIndex, 
fileGroup.columnMappings.keySet(), etlJobConfig.properties.strictMode,
                     dstTableSchema);
@@ -988,12 +990,6 @@ public final class SparkDpp implements 
java.io.Serializable {
                         LOG.info("no data for file file group:" + fileGroup);
                         continue;
                     }
-                    if (!Strings.isNullOrEmpty(fileGroup.where)) {
-                        long originalSize = fileGroupDataframe.count();
-                        fileGroupDataframe = 
fileGroupDataframe.filter(fileGroup.where);
-                        long currentSize = fileGroupDataframe.count();
-                        unselectedRowAcc.add(currentSize - originalSize);
-                    }
 
                     JavaPairRDD<List<Object>, Object[]> ret = 
fillTupleWithPartitionColumn(
                             fileGroupDataframe,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to