This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new aae942b [Spark Load][Bug] Keep the column splitting in spark load
consistent with broker load / mini load (#4532)
aae942b is described below
commit aae942b9822bd08c40377d2bea4b14fc830d37ed
Author: xy720 <[email protected]>
AuthorDate: Sun Sep 6 20:33:26 2020 +0800
[Spark Load][Bug] Keep the column splitting in spark load consistent with
broker load / mini load (#4532)
---
.../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 23 +++++++++++++++++++++-
.../apache/doris/load/loadv2/etl/EtlJobConfig.java | 9 +--------
2 files changed, 23 insertions(+), 9 deletions(-)
diff --git
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index 6e5a714..fd71add 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -62,6 +62,7 @@ import java.io.IOException;
import java.math.BigInteger;
import java.net.URI;
import java.net.URISyntaxException;
+import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -553,12 +554,13 @@ public final class SparkDpp implements
java.io.Serializable {
for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
parsers.add(ColumnParser.create(column));
}
+ char separator =
(char)fileGroup.columnSeparator.getBytes(Charset.forName("UTF-8"))[0];
// now we first support csv file
// TODO: support parquet file and orc file
JavaRDD<Row> rowRDD = sourceDataRdd.flatMap(
record -> {
scannedRowsAcc.add(1);
- String[] attributes =
record.split(fileGroup.columnSeparator);
+ String[] attributes = splitLine(record, separator);
List<Row> result = new ArrayList<>();
boolean validRow = true;
if (attributes.length != columnSize) {
@@ -640,6 +642,25 @@ public final class SparkDpp implements
java.io.Serializable {
return srcSchema;
}
+ // This method is to keep the splitting consistent with broker load / mini
load
+ private String[] splitLine(String line, char sep) {
+ if (line == null || line.equals("")) {
+ return new String[0];
+ }
+ int index = 0;
+ int lastIndex = 0;
+ // line-begin char and line-end char are considered to be 'delimeter'
+ List<String> values = new ArrayList<>();
+ for (int i = 0 ; i < line.length(); i++, index++) {
+ if (line.charAt(index) == sep) {
+ values.add(line.substring(lastIndex, index));
+ lastIndex = index + 1;
+ }
+ }
+ values.add(line.substring(lastIndex, index));
+ return values.toArray(new String[0]);
+ }
+
// partition keys will be parsed into double from json
// so need to convert it to partition columns' type
private Object convertPartitionKey(Object srcValue, Class dstClass) throws
SparkDppException {
diff --git
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
index 9ee4d83..8238aea 100644
---
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
+++
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/etl/EtlJobConfig.java
@@ -503,20 +503,13 @@ public class EtlJobConfig implements Serializable {
this.filePaths = filePaths;
this.fileFieldNames = fileFieldNames;
this.columnsFromPath = columnsFromPath;
+ this.columnSeparator = Strings.isNullOrEmpty(columnSeparator) ?
"\t" : columnSeparator;
this.lineDelimiter = lineDelimiter;
this.isNegative = isNegative;
this.fileFormat = fileFormat;
this.columnMappings = columnMappings;
this.where = where;
this.partitions = partitions;
-
- // Convert some special characters in column separator
- char sep = Strings.isNullOrEmpty(columnSeparator) ? '\t' :
columnSeparator.charAt(0);
- if (".$|()[]{}^?*+\\".indexOf(sep) != -1) {
- this.columnSeparator = new String(new char[]{'\\', sep});
- } else {
- this.columnSeparator = Character.toString(sep);
- }
}
// for data from table
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]