This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2c24fe8 [SparkDpp] Support complete types (#4524)
2c24fe8 is described below
commit 2c24fe80fa76d4afbeeecaacbb11e29e07e84fd9
Author: wangbo <[email protected]>
AuthorDate: Sun Sep 13 11:57:33 2020 +0800
[SparkDpp] Support complete types (#4524)
For[Spark Load]
1 support decimal andl largeint
2 add validate logic for char/varchar/decimal
3 check data load from hive with strict mode
4 support decimal/date/datetime aggregator
---
be/src/olap/push_handler.cpp | 2 +-
.../apache/doris/load/loadv2/dpp/ColumnParser.java | 89 +++++++++--
.../org/apache/doris/load/loadv2/dpp/SparkDpp.java | 170 ++++++++++++++++++---
.../doris/load/loadv2/dpp/SparkRDDAggregator.java | 53 +++++--
.../doris/load/loadv2/dpp/ColumnParserTest.java | 135 ++++++++++++++++
.../apache/doris/load/loadv2/dpp/SparkDppTest.java | 67 ++++++++
6 files changed, 467 insertions(+), 49 deletions(-)
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 2a06329..cfd221d 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -1026,7 +1026,7 @@ OLAPStatus PushBrokerReader::next(ContiguousRow* row) {
const void* value = _tuple->get_slot(slot->tuple_offset());
// try execute init method defined in aggregateInfo
// by default it only copies data into cell
- _schema->column(i)->consume(&cell, (const char*)value, is_null,
+ _schema->column(i)->consume(&cell, (const char*)value, is_null,
_mem_pool.get(),
_runtime_state->obj_pool());
// if column(i) is a value column, try execute finalize method defined
in aggregateInfo
// to convert data into final format
diff --git
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
index d5e0cee..c9d6a42 100644
---
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
+++
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/ColumnParser.java
@@ -19,13 +19,20 @@ package org.apache.doris.load.loadv2.dpp;
import org.apache.doris.common.SparkDppException;
import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import java.io.Serializable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.util.Date;
// Parser to validate value for different type
public abstract class ColumnParser implements Serializable {
+
+ protected static final Logger LOG =
LogManager.getLogger(ColumnParser.class);
+
public static ColumnParser create(EtlJobConfig.EtlColumn etlColumn) throws
SparkDppException {
String columnType = etlColumn.columnType;
if (columnType.equalsIgnoreCase("TINYINT")) {
@@ -51,6 +58,10 @@ public abstract class ColumnParser implements Serializable {
|| columnType.equalsIgnoreCase("BITMAP")
|| columnType.equalsIgnoreCase("HLL")) {
return new StringParser(etlColumn);
+ } else if (columnType.equalsIgnoreCase("DECIMALV2")) {
+ return new DecimalParser(etlColumn);
+ } else if (columnType.equalsIgnoreCase("LARGEINT")) {
+ return new LargeIntParser();
} else {
throw new SparkDppException("unsupported type:" + columnType);
}
@@ -63,10 +74,7 @@ class TinyIntParser extends ColumnParser {
@Override
public boolean parse(String value) {
try {
- Short parsed = Short.parseShort(value);
- if (parsed > 127 || parsed < -128) {
- return false;
- }
+ Byte.parseByte(value);
} catch (NumberFormatException e) {
return false;
}
@@ -102,7 +110,7 @@ class BigIntParser extends ColumnParser {
@Override
public boolean parse(String value) {
try {
- Integer.parseInt(value);
+ Long.parseLong(value);
} catch (NumberFormatException e) {
return false;
}
@@ -114,11 +122,11 @@ class FloatParser extends ColumnParser {
@Override
public boolean parse(String value) {
try {
- Float.parseFloat(value);
+ Float ret = Float.parseFloat(value);
+ return !ret.isNaN() && !ret.isInfinite();
} catch (NumberFormatException e) {
return false;
}
- return true;
}
}
@@ -126,11 +134,11 @@ class DoubleParser extends ColumnParser {
@Override
public boolean parse(String value) {
try {
- Double.parseDouble(value);
+ Double ret = Double.parseDouble(value);
+ return !ret.isInfinite() && !ret.isNaN();
} catch (NumberFormatException e) {
return false;
}
- return true;
}
}
@@ -186,4 +194,67 @@ class StringParser extends ColumnParser {
throw new RuntimeException("string check failed ", e);
}
}
+}
+
+class DecimalParser extends ColumnParser {
+
+ public static int PRECISION = 27;
+ public static int SCALE = 9;
+
+ private BigDecimal maxValue;
+ private BigDecimal minValue;
+
+ public DecimalParser(EtlJobConfig.EtlColumn etlColumn) {
+ StringBuilder precisionStr = new StringBuilder();
+ for (int i = 0; i < etlColumn.precision - etlColumn.scale; i++) {
+ precisionStr.append("9");
+ }
+ StringBuilder scaleStr = new StringBuilder();
+ for (int i = 0; i < etlColumn.scale; i++) {
+ scaleStr.append("9");
+ }
+ maxValue = new BigDecimal(precisionStr.toString() + "." +
scaleStr.toString());
+ minValue = new BigDecimal("-" + precisionStr.toString() + "." +
scaleStr.toString());
+ }
+
+ @Override
+ public boolean parse(String value) {
+ try {
+ BigDecimal bigDecimal = new BigDecimal(value);
+ return bigDecimal.precision() - bigDecimal.scale() <= PRECISION -
SCALE && bigDecimal.scale() <= SCALE;
+ } catch (NumberFormatException e) {
+ return false;
+ } catch (Exception e) {
+ throw new RuntimeException("decimal parse failed ", e);
+ }
+ }
+
+ public BigDecimal getMaxValue() {
+ return maxValue;
+ }
+
+ public BigDecimal getMinValue() {
+ return minValue;
+ }
+}
+
+class LargeIntParser extends ColumnParser {
+
+ private BigInteger maxValue = new
BigInteger("170141183460469231731687303715884105727");
+ private BigInteger minValue = new
BigInteger("-170141183460469231731687303715884105728");
+
+ @Override
+ public boolean parse(String value) {
+ try {
+ BigInteger inputValue = new BigInteger(value);
+ return inputValue.compareTo(maxValue) < 0 &&
inputValue.compareTo(minValue) > 0;
+ } catch (NumberFormatException e) {
+ return false;
+ } catch (ArithmeticException e) {
+ LOG.warn("int value is too big even for java BigInteger,value={}"
+ value);
+ return false;
+ } catch (Exception e) {
+ throw new RuntimeException("large int parse failed:" + value, e);
+ }
+ }
}
\ No newline at end of file
diff --git
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
index fd71add..e4c3a23 100644
--- a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
+++ b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkDpp.java
@@ -40,6 +40,7 @@ import org.apache.spark.Partitioner;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
@@ -59,6 +60,8 @@ import org.apache.spark.util.LongAccumulator;
import org.apache.spark.util.SerializableConfiguration;
import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URI;
import java.net.URISyntaxException;
@@ -107,6 +110,8 @@ public final class SparkDpp implements java.io.Serializable
{
private SerializableConfiguration serializableHadoopConf;
private DppResult dppResult = new DppResult();
+ // just for ut
+ public SparkDpp() {}
public SparkDpp(SparkSession spark, EtlJobConfig etlJobConfig) {
this.spark = spark;
@@ -180,6 +185,8 @@ public final class SparkDpp implements java.io.Serializable
{
long tableId,
EtlJobConfig.EtlIndex indexMeta,
SparkRDDAggregator[] sparkRDDAggregators) throws SparkDppException {
+ // TODO(wb) should deal largint as BigInteger instead of string when
using biginteger as key,
+ // data type may affect sorting logic
StructType dstSchema =
DppUtils.createDstTableSchema(indexMeta.columns, false, true);
ExpressionEncoder encoder = RowEncoder.apply(dstSchema);
@@ -359,12 +366,49 @@ public final class SparkDpp implements
java.io.Serializable {
return Pair.of(keyMap.toArray(new Integer[keyMap.size()]),
valueMap.toArray(new Integer[valueMap.size()]));
}
- // repartition dataframe by partitionid_bucketid
- // so data in the same bucket will be consecutive.
- private JavaPairRDD<List<Object>, Object[]>
fillTupleWithPartitionColumn(SparkSession spark, Dataset<Row> dataframe,
+ /**
+ * check decimal,char/varchar
+ */
+ public boolean validateData(Object srcValue, EtlJobConfig.EtlColumn
etlColumn, ColumnParser columnParser, Row row) {
+
+ switch (etlColumn.columnType.toUpperCase()) {
+ case "DECIMALV2":
+ // TODO(wb): support decimal round; see be
DecimalV2Value::round
+ DecimalParser decimalParser = (DecimalParser) columnParser;
+ BigDecimal srcBigDecimal = (BigDecimal) srcValue;
+ if (srcValue != null &&
(decimalParser.getMaxValue().compareTo(srcBigDecimal) < 0 ||
decimalParser.getMinValue().compareTo(srcBigDecimal) > 0)) {
+ LOG.warn(String.format("decimal value is not valid for
defination, column=%s, value=%s,precision=%s,scale=%s",
+ etlColumn.columnName, srcValue.toString(),
srcBigDecimal.precision(), srcBigDecimal.scale()));
+ return false;
+ }
+ break;
+ case "CHAR":
+ case "VARCHAR":
+ // TODO(wb) padding char type
+ try {
+ int strSize = 0;
+ if (srcValue != null && (strSize =
srcValue.toString().getBytes("UTF-8").length) > etlColumn.stringLength) {
+ LOG.warn(String.format("the length of input is too
long than schema. column_name:%s,input_str[%s],schema length:%s,actual
length:%s",
+ etlColumn.columnName, row.toString(),
etlColumn.stringLength, strSize));
+ return false;
+ }
+ } catch (UnsupportedEncodingException e) {
+ LOG.warn("input string value can not encode with
utf-8,value=" + srcValue.toString());
+ return false;
+ }
+ break;
+ }
+ return true;
+ }
+
+ /**
+ * 1 project column and reorder column
+ * 2 validate data
+ * 3 fill tuple with partition column
+ */
+ private JavaPairRDD<List<Object>, Object[]>
fillTupleWithPartitionColumn(Dataset<Row> dataframe,
EtlJobConfig.EtlPartitionInfo partitionInfo,
List<Integer>
partitionKeyIndex,
- List<Class>
partitionKeySchema,
List<DorisRangePartitioner.PartitionRangeKey> partitionRangeKeys,
List<String>
keyColumnNames,
List<String>
valueColumnNames,
@@ -385,25 +429,42 @@ public final class SparkDpp implements
java.io.Serializable {
}
}
}
+
+ List<ColumnParser> parsers = new ArrayList<>();
+ for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
+ parsers.add(ColumnParser.create(column));
+ }
+
// use PairFlatMapFunction instead of PairMapFunction because the
there will be
// 0 or 1 output row for 1 input row
JavaPairRDD<List<Object>, Object[]> resultPairRDD =
dataframe.toJavaRDD().flatMapToPair(new PairFlatMapFunction<Row, List<Object>,
Object[]>() {
@Override
public Iterator<Tuple2<List<Object>, Object[]>> call(Row row)
throws Exception {
+ List<Tuple2<List<Object>, Object[]>> result = new
ArrayList<>();
List<Object> keyColumns = new ArrayList<>();
- Object[] valueColumns = new Object[valueColumnNames.size()];
- for (String columnName : keyColumnNames) {
+ List<Object> valueColumns = new
ArrayList<>(valueColumnNames.size());
+ for (int i = 0; i < keyColumnNames.size(); i++) {
+ String columnName = keyColumnNames.get(i);
Object columnObject = row.get(row.fieldIndex(columnName));
+ if(!validateData(columnObject,
baseIndex.getColumn(columnName), parsers.get(i), row)) {
+ abnormalRowAcc.add(1);
+ return result.iterator();
+ };
keyColumns.add(columnObject);
}
for (int i = 0; i < valueColumnNames.size(); i++) {
- valueColumns[i] =
row.get(row.fieldIndex(valueColumnNames.get(i)));
+ String columnName = valueColumnNames.get(i);
+ Object columnObject = row.get(row.fieldIndex(columnName));
+ if(!validateData(columnObject,
baseIndex.getColumn(columnName), parsers.get(i + keyColumnNames.size()),row)) {
+ abnormalRowAcc.add(1);
+ return result.iterator();
+ };
+ valueColumns.add(columnObject);
}
DppColumns key = new DppColumns(keyColumns);
int pid = partitioner.getPartition(key);
- List<Tuple2<List<Object>, Object[]>> result = new
ArrayList<>();
if (!validPartitionIndex.contains(pid)) {
LOG.warn("invalid partition for row:" + row + ", pid:" +
pid);
abnormalRowAcc.add(1);
@@ -414,6 +475,7 @@ public final class SparkDpp implements java.io.Serializable
{
LOG.info("invalid rows contents:" +
invalidRows.value());
}
} else {
+ // TODO(wb) support lagreint for hash
long hashValue = DppUtils.getHashValue(row,
distributeColumns, dstTableSchema);
int bucketId = (int) ((hashValue & 0xffffffff) %
partitionInfo.partitions.get(pid).bucketNum);
long partitionId =
partitionInfo.partitions.get(pid).partitionId;
@@ -423,7 +485,7 @@ public final class SparkDpp implements java.io.Serializable
{
List<Object> tuple = new ArrayList<>();
tuple.add(bucketKey);
tuple.addAll(keyColumns);
- result.add(new Tuple2<>(tuple, valueColumns));
+ result.add(new Tuple2<>(tuple, valueColumns.toArray()));
}
return result.iterator();
}
@@ -508,17 +570,6 @@ public final class SparkDpp implements
java.io.Serializable {
dataframe = dataframe.withColumn(mappingColumn,
functions.expr(mappingDescription).cast(dstTableSchema.apply(mappingColumn).dataType()));
}
- // projection and reorder the columns
- dataframe.createOrReplaceTempView("src_table");
- StringBuilder selectSqlBuilder = new StringBuilder();
- selectSqlBuilder.append("select ");
- for (String name : dstColumnNames) {
- selectSqlBuilder.append(name + ",");
- }
- selectSqlBuilder.deleteCharAt(selectSqlBuilder.length() - 1);
- selectSqlBuilder.append(" from src_table");
- String selectSql = selectSqlBuilder.toString();
- dataframe = spark.sql(selectSql);
return dataframe;
}
@@ -588,7 +639,8 @@ public final class SparkDpp implements java.io.Serializable
{
int index =
dstColumnNameToIndex.get(srcColumnName);
String type =
columns.get(index).columnType;
if (type.equalsIgnoreCase("CHAR")
- ||
type.equalsIgnoreCase("VARCHAR")) {
+ || type.equalsIgnoreCase("VARCHAR")
+ ||
fileGroup.columnMappings.containsKey(field.name())) {
continue;
}
ColumnParser parser = parsers.get(index);
@@ -675,6 +727,9 @@ public final class SparkDpp implements java.io.Serializable
{
} else if (dstClass.equals(Long.class)) {
return ((Double) srcValue).longValue();
} else if (dstClass.equals(BigInteger.class)) {
+ // TODO(wb) gson will cast origin value to double by default
+ // when the partition column is largeint, this will cause
error data
+ // need fix it thoroughly
return new BigInteger(((Double) srcValue).toString());
} else if (dstClass.equals(java.sql.Date.class) ||
dstClass.equals(java.util.Date.class)) {
double srcValueDouble = (double)srcValue;
@@ -800,10 +855,78 @@ public final class SparkDpp implements
java.io.Serializable {
});
sql.deleteCharAt(sql.length() - 1).append(" from
").append(hiveDbTableName);
Dataset<Row> dataframe = spark.sql(sql.toString());
+ dataframe = checkDataFromHiveWithStrictMode(dataframe, baseIndex,
fileGroup.columnMappings.keySet(), etlJobConfig.properties.strictMode,
+ dstTableSchema);
dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe,
dstTableSchema, fileGroup);
return dataframe;
}
+ private Dataset<Row> checkDataFromHiveWithStrictMode(
+ Dataset<Row> dataframe, EtlJobConfig.EtlIndex baseIndex,
Set<String> mappingColKeys, boolean isStrictMode, StructType dstTableSchema)
throws SparkDppException {
+ List<EtlJobConfig.EtlColumn> columnNameNeedCheckArrayList = new
ArrayList<>();
+ List<ColumnParser> columnParserArrayList = new ArrayList<>();
+ for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
+ if (!StringUtils.equalsIgnoreCase(column.columnType, "varchar") &&
+ !StringUtils.equalsIgnoreCase(column.columnType, "char") &&
+ !mappingColKeys.contains(column.columnName)) {
+ columnNameNeedCheckArrayList.add(column);
+ columnParserArrayList.add(ColumnParser.create(column));
+ }
+ }
+
+ ColumnParser[] columnParserArray = columnParserArrayList.toArray(new
ColumnParser[columnParserArrayList.size()]);
+ EtlJobConfig.EtlColumn[] columnNameArray =
columnNameNeedCheckArrayList.toArray(new
EtlJobConfig.EtlColumn[columnNameNeedCheckArrayList.size()]);
+
+ JavaRDD<Row> result = dataframe.toJavaRDD().flatMap(new
FlatMapFunction<Row, Row>() {
+ @Override
+ public Iterator<Row> call(Row row) throws Exception {
+ List<Row> result = new ArrayList<>();
+ Set<Integer> columnIndexNeedToRepalceNull = new
HashSet<Integer>();
+ boolean validRow = true;
+ for (int i = 0; i < columnNameArray.length; i++) {
+ EtlJobConfig.EtlColumn column = columnNameArray[i];
+ int fieldIndex = row.fieldIndex(column.columnName);
+ Object value = row.get(fieldIndex);
+ if (value == null && !column.isAllowNull) {
+ validRow = false;
+ LOG.warn("column:" + i + " can not be null. row:" +
row.toString());
+ break;
+ }
+ if (value != null &&
!columnParserArray[i].parse(value.toString())) {
+ if (isStrictMode) {
+ validRow = false;
+ LOG.warn(String.format("row parsed failed in
strict mode, column name %s, src row %s", column.columnName, row.toString()));
+ } else {
+ columnIndexNeedToRepalceNull.add(fieldIndex);
+ }
+ }
+ }
+ if (!validRow) {
+ abnormalRowAcc.add(1);
+ // at most add 5 rows to invalidRows
+ if (abnormalRowAcc.value() <= 5) {
+ invalidRows.add(row.toString());
+ }
+ } if (columnIndexNeedToRepalceNull.size() != 0) {
+ Object[] newRow = new Object[row.size()];
+ for (int i = 0; i < row.size(); i++) {
+ if (columnIndexNeedToRepalceNull.contains(i)) {
+ newRow[i] = null;
+ } else {
+ newRow[i] = row.get(i);
+ }
+ }
+ result.add(RowFactory.create(newRow));
+ } else {
+ result.add(row);
+ }
+ return result.iterator();
+ }
+ });
+
+ return spark.createDataFrame(result, dstTableSchema);
+ }
+
private void process() throws Exception {
try {
for (Map.Entry<Long, EtlJobConfig.EtlTable> entry :
etlJobConfig.tables.entrySet()) {
@@ -872,9 +995,10 @@ public final class SparkDpp implements
java.io.Serializable {
unselectedRowAcc.add(currentSize - originalSize);
}
- JavaPairRDD<List<Object>, Object[]> ret =
fillTupleWithPartitionColumn(spark, fileGroupDataframe,
+ JavaPairRDD<List<Object>, Object[]> ret =
fillTupleWithPartitionColumn(
+ fileGroupDataframe,
partitionInfo, partitionKeyIndex,
- partitionKeySchema, partitionRangeKeys,
+ partitionRangeKeys,
keyColumnNames, valueColumnNames,
dstTableSchema, baseIndex, fileGroup.partitions);
if (tablePairRDD == null) {
diff --git
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java
index 4682fdc..7f1d030 100644
---
a/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java
+++
b/fe/spark-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java
@@ -32,6 +32,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
+import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Comparator;
@@ -52,7 +53,6 @@ public abstract class SparkRDDAggregator<T> implements
Serializable {
return value;
};
- // TODO(wb) support more datatype:decimal,date,datetime
public static SparkRDDAggregator buildAggregator(EtlJobConfig.EtlColumn
column) throws SparkDppException {
String aggType = StringUtils.lowerCase(column.aggregationType);
String columnType = StringUtils.lowerCase(column.columnType);
@@ -69,6 +69,9 @@ public abstract class SparkRDDAggregator<T> implements
Serializable {
case "bigint":
case "float":
case "double":
+ case "decimalv2":
+ case "date":
+ case "datetime":
return new NumberMaxAggregator();
case "char":
case "varchar":
@@ -86,6 +89,9 @@ public abstract class SparkRDDAggregator<T> implements
Serializable {
case "bigint":
case "float":
case "double":
+ case "decimalv2":
+ case "date":
+ case "datetime":
return new NumberMinAggregator();
case "char":
case "varchar":
@@ -111,6 +117,8 @@ public abstract class SparkRDDAggregator<T> implements
Serializable {
return new DoubleSumAggregator();
case "largeint":
return new LargeIntSumAggregator();
+ case "decimalv2":
+ return new BigDecimalSumAggregator();
default:
throw new SparkDppException(String.format("unsupported
sum aggregator for column type:%s", columnType));
}
@@ -324,6 +332,12 @@ class LargeIntMaxAggregator extends
SparkRDDAggregator<BigInteger> {
}
return dst.compareTo(src) > 0 ? dst : src;
}
+
+ @Override
+ String finalize(Object value) {
+ BigInteger bigInteger = (BigInteger) value;
+ return bigInteger.toString();
+ }
}
class LargeIntMinAggregator extends LargeIntMaxAggregator {
@@ -394,7 +408,6 @@ class LongSumAggregator extends SparkRDDAggregator<Long> {
if (dst == null) {
return src;
}
- // TODO(wb) check overflow of long type
return dst + src;
}
}
@@ -409,11 +422,9 @@ class ShortSumAggregator extends SparkRDDAggregator<Short>
{
if (dst == null) {
return src;
}
- Integer ret = dst + src;
- if (ret > Short.MAX_VALUE || ret < Short.MIN_VALUE) {
- throw new RuntimeException("short column sum size exceeds
Short.MAX_VALUE or Short.MIN_VALUE");
- }
- return Short.valueOf(ret.toString());
+ int ret = dst + src;
+ // here may overflow, just keep the same logic with be
+ return (short)ret;
}
}
@@ -428,9 +439,7 @@ class IntSumAggregator extends SparkRDDAggregator<Integer> {
return src;
}
long ret = Long.sum(dst, src);
- if (ret > Integer.MAX_VALUE || ret < Integer.MIN_VALUE) {
- throw new RuntimeException("int column sum size exceeds
Integer.MAX_VALUE or Integer.MIN_VALUE");
- }
+ // here may overflow, just keep the same logic with be
return (int) ret;
}
}
@@ -445,11 +454,9 @@ class ByteSumAggregator extends SparkRDDAggregator<Byte> {
if (dst == null) {
return src;
}
- Integer ret = dst + src;
- if (ret > Byte.MAX_VALUE || ret < Byte.MIN_VALUE) {
- throw new RuntimeException("byte column sum size exceeds
Byte.MAX_VALUE or Byte.MIN_VALUE");
- }
- return Byte.valueOf(ret.toString());
+ int ret = dst + src;
+ // here may overflow, just keep the same logic with be
+ return (byte)ret;
}
}
@@ -467,7 +474,6 @@ class DoubleSumAggregator extends
SparkRDDAggregator<Double> {
}
}
-// TODO(wb) add bound check for float/double
class FloatSumAggregator extends SparkRDDAggregator<Float> {
@Override
@@ -510,6 +516,21 @@ class StringMinAggregator extends
SparkRDDAggregator<String> {
}
}
+class BigDecimalSumAggregator extends SparkRDDAggregator<BigDecimal> {
+
+
+ @Override
+ BigDecimal update(BigDecimal src, BigDecimal dst) {
+ if (src == null) {
+ return dst;
+ }
+ if (dst == null) {
+ return src;
+ }
+ return src.add(dst);
+ }
+}
+
class BucketComparator implements Comparator<List<Object>>, Serializable {
diff --git
a/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/ColumnParserTest.java
b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/ColumnParserTest.java
new file mode 100644
index 0000000..cfb4122
--- /dev/null
+++
b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/ColumnParserTest.java
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ColumnParserTest {
+
+ // TODO(wb) try to keep ut consistent with be's ut
+ @Test
+ public void testBoundCheck() {
+ // tinyint
+ TinyIntParser tinyIntParser = new TinyIntParser();
+ // 1 normal
+ String tinyint = "100";
+ Assert.assertTrue(tinyIntParser.parse(tinyint));
+ // 2 upper
+ String tinyintUpper = "128";
+ Assert.assertFalse(tinyIntParser.parse(tinyintUpper));
+ // 3 lower
+ String tinyintLower = "-129";
+ Assert.assertFalse(tinyIntParser.parse(tinyintLower));
+
+ // smallint
+ SmallIntParser smallIntParser = new SmallIntParser();
+ // 1 normal
+ String smallint = "100";
+ Assert.assertTrue(smallIntParser.parse(smallint));
+ // 2 upper
+ String smallintUpper = "32768";
+ Assert.assertFalse(smallIntParser.parse(smallintUpper));
+ // 3 lower
+ String smallintLower = "-32769";
+ Assert.assertFalse(smallIntParser.parse(smallintLower));
+
+ // int
+ IntParser intParser = new IntParser();
+ // 1 normal
+ String intValue = "100";
+ Assert.assertTrue(intParser.parse(intValue));
+ // 2 upper
+ String intUpper = "2147483648";
+ Assert.assertFalse(intParser.parse(intUpper));
+ // 3 lower
+ String intLower = "-2147483649";
+ Assert.assertFalse(intParser.parse(intLower));
+
+ // bigint
+ BigIntParser bigIntParser = new BigIntParser();
+ // 1 normal
+ String bigint = "100";
+ Assert.assertTrue(bigIntParser.parse(bigint));
+ // 2 upper
+ String bigintUpper = "9223372036854775808";
+ Assert.assertFalse(bigIntParser.parse(bigintUpper));
+ // 3 lower
+ String bigintLower = "-9223372036854775809";
+ Assert.assertFalse(bigIntParser.parse(bigintLower));
+
+ // largeint
+ LargeIntParser largeIntParser = new LargeIntParser();
+ // 1 normal
+ String largeint = "100";
+ Assert.assertTrue(largeIntParser.parse(largeint));
+ // 2 upper
+ String largeintUpper = "170141183460469231731687303715884105728";
+ Assert.assertFalse(largeIntParser.parse(largeintUpper));
+ // 3 lower
+ String largeintLower = "-170141183460469231731687303715884105729";
+ Assert.assertFalse(largeIntParser.parse(largeintLower));
+
+ // float
+ FloatParser floatParser = new FloatParser();
+ // normal
+ String floatValue = "1.1";
+ Assert.assertTrue(floatParser.parse(floatValue));
+ // inf
+ String inf = "Infinity";
+ Assert.assertFalse(floatParser.parse(inf));
+ // nan
+ String nan = "NaN";
+ // failed
+ Assert.assertFalse(floatParser.parse(nan));
+
+ // double
+ DoubleParser doubleParser = new DoubleParser();
+ // normal
+ Assert.assertTrue(doubleParser.parse(floatValue));
+ // inf
+ Assert.assertFalse(doubleParser.parse(inf));
+ // nan
+ Assert.assertFalse(doubleParser.parse(nan));
+
+ // decimal
+ EtlJobConfig.EtlColumn etlColumn = new EtlJobConfig.EtlColumn();
+ etlColumn.precision = 5;
+ etlColumn.scale = 3;
+ DecimalParser decimalParser = new DecimalParser(etlColumn);
+ // normal
+ String decimalValue = "10.333";
+ Assert.assertTrue(decimalParser.parse(decimalValue));
+ // overflow
+ String decimalOverflow = "1000.3333333333";
+ Assert.assertFalse(decimalParser.parse(decimalOverflow));
+
+ // string
+ EtlJobConfig.EtlColumn stringColumn = new EtlJobConfig.EtlColumn();
+ stringColumn.stringLength = 3;
+ StringParser stringParser = new StringParser(stringColumn);
+ // normal
+ String stringnormal = "a";
+ Assert.assertTrue(stringParser.parse(stringnormal));
+ // overflow
+ String stringoverflow = "中文";
+ Assert.assertFalse(stringParser.parse(stringoverflow));
+ }
+
+}
diff --git
a/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/SparkDppTest.java
b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/SparkDppTest.java
new file mode 100644
index 0000000..9cec220
--- /dev/null
+++
b/fe/spark-dpp/src/test/java/org/apache/doris/load/loadv2/dpp/SparkDppTest.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package org.apache.doris.load.loadv2.dpp;
+
+import org.apache.doris.load.loadv2.etl.EtlJobConfig;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+
+public class SparkDppTest {
+
+ @Test
+ public void testValidateData() {
+ SparkDpp sparkDpp = new SparkDpp();
+
+ // decimal
+ EtlJobConfig.EtlColumn etlColumn = new EtlJobConfig.EtlColumn();
+ etlColumn.columnType = "DECIMALV2";
+ etlColumn.precision = 3;
+ etlColumn.scale = 2;
+
+ DecimalParser decimalParser = new DecimalParser(etlColumn);
+ // test max/min
+
Assert.assertTrue(decimalParser.getMaxValue().toString().equals("9.99"));
+
Assert.assertTrue(decimalParser.getMinValue().toString().equals("-9.99"));
+ // normal
+ BigDecimal bigDecimal = new BigDecimal("1.21");
+ Assert.assertTrue(sparkDpp.validateData(bigDecimal, etlColumn,
decimalParser, RowFactory.create(bigDecimal)));
+ // failed
+ BigDecimal bigDecimalFailed = new BigDecimal("10");
+ Assert.assertFalse(sparkDpp.validateData(bigDecimalFailed, etlColumn,
decimalParser, RowFactory.create(bigDecimalFailed)));
+
+ // string
+ EtlJobConfig.EtlColumn stringColumn = new EtlJobConfig.EtlColumn();
+ stringColumn.stringLength = 3;
+ stringColumn.columnType = "VARCHAR";
+ StringParser stringParser = new StringParser(stringColumn);
+ // normal
+ String normalString = "a1";
+ Assert.assertTrue(sparkDpp.validateData(normalString, stringColumn,
stringParser, RowFactory.create(normalString)));
+ // cn normal
+ String normalStringCN = "中";
+ Assert.assertTrue(sparkDpp.validateData(normalStringCN, stringColumn,
stringParser, RowFactory.create(normalStringCN)));
+ // cn failed
+ String failedStringCN = "中a";
+ Assert.assertFalse(sparkDpp.validateData(failedStringCN, stringColumn,
stringParser, RowFactory.create(failedStringCN)));
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]