This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 82940a4 [Spark Load] Fix spark load bugs (#4464)
82940a4 is described below
commit 82940a4905b6b578af943b1ea0663bc30aa933a6
Author: wyb <[email protected]>
AuthorDate: Thu Aug 27 23:40:33 2020 +0800
[Spark Load] Fix spark load bugs (#4464)
1. fix write dpp result when dpp throw exception
2. boolean value:true, false(IgnoreCase), 0, 1
3. wrong dest column for source data check
4. support * in source file path
5. if job state is cancelled or finished, submitPushTasks would throw all
partitions have no load data exception,
because tableToLoadPartitions was already cleaned up
#3433
---
.../doris/load/loadv2/SparkEtlJobHandler.java | 8 ++-
.../org/apache/doris/load/loadv2/SparkLoadJob.java | 12 ++++
.../apache/doris/load/loadv2/SparkLoadJobTest.java | 16 +++++
.../apache/doris/load/loadv2/dpp/ColumnParser.java | 3 +-
.../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 80 +++++++++++++---------
.../apache/doris/load/loadv2/etl/EtlJobConfig.java | 5 +-
6 files changed, 87 insertions(+), 37 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index 1d9eedc..96b7064 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -239,9 +239,11 @@ public class SparkEtlJobHandler {
byte[] data = BrokerUtil.readFile(dppResultFilePath,
brokerDesc);
String dppResultStr = new String(data, "UTF-8");
DppResult dppResult = new Gson().fromJson(dppResultStr,
DppResult.class);
- status.setDppResult(dppResult);
- if (status.getState() == TEtlState.CANCELLED &&
!Strings.isNullOrEmpty(dppResult.failedReason)) {
- status.setFailMsg(dppResult.failedReason);
+ if (dppResult != null) {
+ status.setDppResult(dppResult);
+ if (status.getState() == TEtlState.CANCELLED &&
!Strings.isNullOrEmpty(dppResult.failedReason)) {
+ status.setFailMsg(dppResult.failedReason);
+ }
}
} catch (UserException | JsonSyntaxException |
UnsupportedEncodingException e) {
LOG.warn("read broker file failed. path: {}",
dppResultFilePath, e);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 6576be2..d18416d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -412,6 +412,14 @@ public class SparkLoadJob extends BulkLoadJob {
try {
writeLock();
try {
+ // check state is still loading. If state is cancelled or
finished, return.
+ // if state is cancelled or finished and not return, this
would throw all partitions have no load data exception,
+ // because tableToLoadPartitions was already cleaned up,
+ if (state != JobState.LOADING) {
+ LOG.warn("job state is not loading. job id: {}, state:
{}", id, state);
+ return totalTablets;
+ }
+
for (Map.Entry<Long, Set<Long>> entry :
tableToLoadPartitions.entrySet()) {
long tableId = entry.getKey();
OlapTable table = (OlapTable) db.getTable(tableId);
@@ -565,6 +573,10 @@ public class SparkLoadJob extends BulkLoadJob {
// submit push tasks
Set<Long> totalTablets = submitPushTasks();
+ if (totalTablets.isEmpty()) {
+ LOG.warn("total tablets set is empty. job id: {}, state: {}", id,
state);
+ return;
+ }
// update status
boolean canCommitJob = false;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
index e86cad0..57bbd9a 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java
@@ -415,6 +415,22 @@ public class SparkLoadJobTest {
}
@Test
+ public void testSubmitTasksWhenStateFinished(@Mocked Catalog catalog,
@Injectable String originStmt,
+ @Injectable Database db)
throws Exception {
+ new Expectations() {
+ {
+ catalog.getDb(dbId);
+ result = db;
+ }
+ };
+
+ SparkLoadJob job = getEtlStateJob(originStmt);
+ job.state = JobState.FINISHED;
+ Set<Long> totalTablets = Deencapsulation.invoke(job,
"submitPushTasks");
+ Assert.assertTrue(totalTablets.isEmpty());
+ }
+
+ @Test
public void testStateUpdateInfoPersist() throws IOException {
String fileName = "./testStateUpdateInfoPersistFile";
File file = new File(fileName);
diff --git
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
index 0b91cb0..d5e0cee 100644
---
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
+++
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
@@ -138,7 +138,8 @@ class BooleanParser extends ColumnParser {
@Override
public boolean parse(String value) {
if (value.equalsIgnoreCase("true")
- || value.equalsIgnoreCase("false")) {
+ || value.equalsIgnoreCase("false")
+ || value.equals("0") || value.equals("1")) {
return true;
}
return false;
diff --git
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 53c9111..bc0a6cc 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -105,6 +105,7 @@ public final class SparkDpp implements java.io.Serializable
{
// because hadoop configuration is not serializable,
// we need to wrap it so that we can use it in executor.
private SerializableConfiguration serializableHadoopConf;
+ private DppResult dppResult = new DppResult();
public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) {
@@ -485,7 +486,8 @@ public final class SparkDpp implements java.io.Serializable
{
dataframe = dataframe.withColumn(dstField.name(),
dataframe.col(dstField.name()).cast("date"));
} else if (column.columnType.equalsIgnoreCase("BOOLEAN")) {
dataframe = dataframe.withColumn(dstField.name(),
-
functions.when(dataframe.col(dstField.name()).equalTo("true"), "1")
+
functions.when(functions.lower(dataframe.col(dstField.name())).equalTo("true"),
"1")
+
.when(dataframe.col(dstField.name()).equalTo("1"), "1")
.otherwise("0"));
} else if (!column.columnType.equalsIgnoreCase(BITMAP_TYPE) &&
!dstField.dataType().equals(DataTypes.StringType)) {
dataframe = dataframe.withColumn(dstField.name(),
dataframe.col(dstField.name()).cast(dstField.dataType()));
@@ -535,9 +537,10 @@ public final class SparkDpp implements
java.io.Serializable {
dataSrcColumns.add(column.columnName);
}
}
- List<String> dstTableNames = new ArrayList<>();
- for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
- dstTableNames.add(column.columnName);
+ // for getting schema to check source data
+ Map<String, Integer> dstColumnNameToIndex = new HashMap<String,
Integer>();
+ for (int i = 0; i < baseIndex.columns.size(); i++) {
+ dstColumnNameToIndex.put(baseIndex.columns.get(i).columnName, i);
}
List<String> srcColumnsWithColumnsFromPath = new ArrayList<>();
srcColumnsWithColumnsFromPath.addAll(dataSrcColumns);
@@ -566,25 +569,28 @@ public final class SparkDpp implements
java.io.Serializable {
validRow = false;
} else {
for (int i = 0; i < attributes.length; ++i) {
- if (attributes[i].equals(NULL_FLAG)) {
- if (baseIndex.columns.get(i).isAllowNull) {
+ StructField field = srcSchema.apply(i);
+ String srcColumnName = field.name();
+ if (attributes[i].equals(NULL_FLAG) &&
dstColumnNameToIndex.containsKey(srcColumnName)) {
+ if
(baseIndex.columns.get(dstColumnNameToIndex.get(srcColumnName)).isAllowNull) {
attributes[i] = null;
} else {
- LOG.warn("colunm:" + i + " can not be
null. row:" + record);
+ LOG.warn("column name:" + srcColumnName +
", attribute: " + i
+ + " can not be null. row:" +
record);
validRow = false;
break;
}
}
- boolean isStrictMode = (boolean)
etlJobConfig.properties.strictMode;
+ boolean isStrictMode =
etlJobConfig.properties.strictMode;
if (isStrictMode) {
- StructField field = srcSchema.apply(i);
- if (dstTableNames.contains(field.name())) {
- String type = columns.get(i).columnType;
+ if
(dstColumnNameToIndex.containsKey(srcColumnName)) {
+ int index =
dstColumnNameToIndex.get(srcColumnName);
+ String type =
columns.get(index).columnType;
if (type.equalsIgnoreCase("CHAR")
||
type.equalsIgnoreCase("VARCHAR")) {
continue;
}
- ColumnParser parser = parsers.get(i);
+ ColumnParser parser = parsers.get(index);
boolean valid =
parser.parse(attributes[i]);
if (!valid) {
validRow = false;
@@ -726,12 +732,20 @@ public final class SparkDpp implements
java.io.Serializable {
throws SparkDppException, IOException, URISyntaxException {
Dataset<Row> fileGroupDataframe = null;
for (String filePath : filePaths) {
- fileNumberAcc.add(1);
try {
URI uri = new URI(filePath);
FileSystem fs = FileSystem.get(uri,
serializableHadoopConf.value());
- FileStatus fileStatus = fs.getFileStatus(new Path(filePath));
- fileSizeAcc.add(fileStatus.getLen());
+ FileStatus[] fileStatuses = fs.globStatus(new Path(filePath));
+ if (fileStatuses == null) {
+ throw new SparkDppException("fs list status failed: " +
filePath);
+ }
+ for (FileStatus fileStatus : fileStatuses) {
+ if (fileStatus.isDirectory()) {
+ continue;
+ }
+ fileNumberAcc.add(1);
+ fileSizeAcc.add(fileStatus.getLen());
+ }
} catch (Exception e) {
LOG.warn("parse path failed:" + filePath);
throw e;
@@ -770,8 +784,7 @@ public final class SparkDpp implements java.io.Serializable
{
return dataframe;
}
- private DppResult process() throws Exception {
- DppResult dppResult = new DppResult();
+ private void process() throws Exception {
try {
for (Map.Entry<Long, EtlJobConfig.EtlTable> entry :
etlJobConfig.tables.entrySet()) {
Long tableId = entry.getKey();
@@ -852,34 +865,26 @@ public final class SparkDpp implements
java.io.Serializable {
}
processRollupTree(rootNode, tablePairRDD, tableId, baseIndex);
}
- spark.stop();
+ LOG.info("invalid rows contents:" + invalidRows.value());
+ dppResult.isSuccess = true;
+ dppResult.failedReason = "";
} catch (Exception exception) {
LOG.warn("spark dpp failed for exception:" + exception);
dppResult.isSuccess = false;
dppResult.failedReason = exception.getMessage();
+ throw exception;
+ } finally {
+ spark.stop();
dppResult.normalRows = scannedRowsAcc.value() -
abnormalRowAcc.value();
dppResult.scannedRows = scannedRowsAcc.value();
dppResult.fileNumber = fileNumberAcc.value();
dppResult.fileSize = fileSizeAcc.value();
dppResult.abnormalRows = abnormalRowAcc.value();
dppResult.partialAbnormalRows = invalidRows.value();
- throw exception;
}
- LOG.info("invalid rows contents:" + invalidRows.value());
- dppResult.isSuccess = true;
- dppResult.failedReason = "";
- dppResult.normalRows = scannedRowsAcc.value() - abnormalRowAcc.value();
- dppResult.scannedRows = scannedRowsAcc.value();
- dppResult.fileNumber = fileNumberAcc.value();
- dppResult.fileSize = fileSizeAcc.value();
- dppResult.abnormalRows = abnormalRowAcc.value();
- dppResult.partialAbnormalRows = invalidRows.value();
- return dppResult;
}
- public void doDpp() throws Exception {
- // write dpp result to output
- DppResult dppResult = process();
+ private void writeDppResult(DppResult dppResult) throws Exception {
String outputPath = etlJobConfig.getOutputPath();
String resultFilePath = outputPath + "/" + DPP_RESULT_FILE;
URI uri = new URI(outputPath);
@@ -891,5 +896,16 @@ public final class SparkDpp implements
java.io.Serializable {
outputStream.write('\n');
outputStream.close();
}
+
+ public void doDpp() throws Exception {
+ try {
+ process();
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ // write dpp result to file in outputPath
+ writeDppResult(dppResult);
+ }
+ }
}
diff --git
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
index 9dec697..fe81aeb 100644
---
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
+++
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
@@ -111,7 +111,10 @@ import java.util.Map;
},
"where": "k2 > 10",
"isNegative": false,
- "hiveTableName": "hive_db.table"
+ "hiveDbTableName": "hive_db.table",
+ "hiveTableProperties": {
+ "hive.metastore.uris": "thrift://host:port"
+ }
}]
}
},
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]