This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new e44038c [feature-wip](array-type) Array data can be loaded in stream
load. (#8368) (#8585)
e44038c is described below
commit e44038caf3df0bbb1bb19f87f628a1732a80c7bc
Author: Adonis Ling <[email protected]>
AuthorDate: Tue Mar 22 15:25:40 2022 +0800
[feature-wip](array-type) Array data can be loaded in stream load. (#8368)
(#8585)
Please refer to #8367 .
---
be/src/exprs/cast_functions.cpp | 7 +
be/src/exprs/cast_functions.h | 2 +
be/src/util/array_parser.hpp | 212 +++++++++++++++++++++
be/test/util/CMakeLists.txt | 1 +
be/test/util/array_parser_test.cpp | 134 +++++++++++++
.../java/org/apache/doris/analysis/CastExpr.java | 19 +-
.../main/java/org/apache/doris/catalog/Type.java | 4 +-
7 files changed, 371 insertions(+), 8 deletions(-)
diff --git a/be/src/exprs/cast_functions.cpp b/be/src/exprs/cast_functions.cpp
index baadd09..8903d5b 100644
--- a/be/src/exprs/cast_functions.cpp
+++ b/be/src/exprs/cast_functions.cpp
@@ -26,6 +26,7 @@
#include "runtime/datetime_value.h"
#include "runtime/string_value.h"
#include "string_functions.h"
+#include "util/array_parser.hpp"
#include "util/mysql_global.h"
#include "util/string_parser.hpp"
@@ -357,4 +358,10 @@ DateTimeVal
CastFunctions::cast_to_date_val(FunctionContext* ctx, const StringVa
return result;
}
+CollectionVal CastFunctions::cast_to_array_val(FunctionContext* context, const
StringVal& val) {
+ CollectionVal array_val;
+ Status status = ArrayParser::parse(array_val, context, val);
+ return status.ok() ? array_val : CollectionVal::null();
+}
+
} // namespace doris
diff --git a/be/src/exprs/cast_functions.h b/be/src/exprs/cast_functions.h
index 0160275..fa75d7c 100644
--- a/be/src/exprs/cast_functions.h
+++ b/be/src/exprs/cast_functions.h
@@ -136,6 +136,8 @@ public:
static DateTimeVal cast_to_date_val(FunctionContext* context, const
DoubleVal& val);
static DateTimeVal cast_to_date_val(FunctionContext* context, const
DateTimeVal& val);
static DateTimeVal cast_to_date_val(FunctionContext* context, const
StringVal& val);
+
+ static CollectionVal cast_to_array_val(FunctionContext* context, const
StringVal& val);
};
} // namespace doris
diff --git a/be/src/util/array_parser.hpp b/be/src/util/array_parser.hpp
new file mode 100644
index 0000000..746695a
--- /dev/null
+++ b/be/src/util/array_parser.hpp
@@ -0,0 +1,212 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <rapidjson/document.h>
+
+#include <unordered_map>
+
+#include "common/status.h"
+#include "exprs/anyval_util.h"
+#include "runtime/collection_value.h"
+#include "runtime/primitive_type.h"
+#include "runtime/types.h"
+#include "util/mem_util.hpp"
+
+namespace doris {
+
+template <typename Encoding>
+using ConstArray = typename rapidjson::GenericValue<Encoding>::ConstArray;
+
+template <typename Encoding>
+using ConstArrayIterator = typename ConstArray<Encoding>::ValueIterator;
+
+class ArrayParser {
+public:
+ static Status parse(CollectionVal& array_val, FunctionContext* context,
+ const StringVal& str_val) {
+ rapidjson::Document document;
+ if (document.Parse(reinterpret_cast<char*>(str_val.ptr),
str_val.len).HasParseError() ||
+ !document.IsArray()) {
+ return Status::RuntimeError("Failed to parse the json to array.");
+ }
+ if (document.IsNull()) {
+ array_val = CollectionVal::null();
+ return Status::OK();
+ }
+ auto type_desc =
_convert_to_type_descriptor(context->get_return_type());
+ return _parse<rapidjson::UTF8<>>(
+ array_val, context,
+ reinterpret_cast<const
rapidjson::Document*>(&document)->GetArray(), type_desc);
+ }
+
+private:
+ static TypeDescriptor _convert_to_type_descriptor(
+ FunctionContext::TypeDesc function_type_desc) {
+ auto iterator = _types_mapping.find(function_type_desc.type);
+ if (iterator == _types_mapping.end()) {
+ return TypeDescriptor();
+ }
+ auto type_desc = TypeDescriptor(iterator->second);
+ type_desc.len = function_type_desc.len;
+ type_desc.precision = function_type_desc.precision;
+ type_desc.scale = function_type_desc.scale;
+ for (auto child_type_desc : function_type_desc.children) {
+
type_desc.children.push_back(_convert_to_type_descriptor(child_type_desc));
+ }
+ return type_desc;
+ }
+
+ template <typename Encoding>
+ static Status _parse(CollectionVal& array_val, FunctionContext* context,
+ const ConstArray<Encoding>& array, const
TypeDescriptor& type_desc) {
+ if (array.Empty()) {
+ CollectionValue(0).to_collection_val(&array_val);
+ return Status::OK();
+ }
+ auto child_type_desc = type_desc.children[0];
+ auto item_type = child_type_desc.type;
+ CollectionValue collection_value;
+ CollectionValue::init_collection(context, array.Size(), item_type,
&collection_value);
+ int index = 0;
+ for (auto it = array.Begin(); it != array.End(); ++it) {
+ if (it->IsNull()) {
+ auto null = AnyVal(true);
+ collection_value.set(index++, item_type, &null);
+ continue;
+ } else if (!_is_type_valid<Encoding>(it, item_type)) {
+ return Status::RuntimeError("Failed to parse the json to
array.");
+ }
+ AnyVal* val;
+ Status status = _parse<Encoding>(&val, context, it,
child_type_desc);
+ if (!status.ok()) {
+ return status;
+ }
+ collection_value.set(index++, item_type, val);
+ }
+ collection_value.to_collection_val(&array_val);
+ return Status::OK();
+ }
+
+ template <typename Encoding>
+ static bool _is_type_valid(const ConstArrayIterator<Encoding> iterator,
+ const PrimitiveType type) {
+ switch (type) {
+ case TYPE_NULL:
+ return iterator->IsNull();
+ case TYPE_BOOLEAN:
+ return iterator->IsBool();
+ case TYPE_TINYINT:
+ case TYPE_SMALLINT:
+ case TYPE_INT:
+ case TYPE_BIGINT:
+ case TYPE_LARGEINT:
+ case TYPE_FLOAT:
+ case TYPE_DOUBLE:
+ return iterator->IsNumber();
+ case TYPE_DATE:
+ case TYPE_DATETIME:
+ case TYPE_CHAR:
+ case TYPE_VARCHAR:
+ case TYPE_HLL:
+ case TYPE_STRING:
+ return iterator->IsString();
+ case TYPE_OBJECT:
+ return iterator->IsObject();
+ case TYPE_ARRAY:
+ return iterator->IsArray();
+ default:
+ return false;
+ }
+ }
+
+ template <typename Encoding>
+ static Status _parse(AnyVal** val, FunctionContext* context,
+ const ConstArrayIterator<Encoding> iterator,
+ const TypeDescriptor& type_desc) {
+ switch (type_desc.type) {
+ case TYPE_ARRAY:
+ *val =
reinterpret_cast<AnyVal*>(context->allocate(sizeof(CollectionVal)));
+ new (*val) CollectionVal();
+ return _parse<Encoding>(*reinterpret_cast<CollectionVal*>(*val),
context,
+ iterator->GetArray(), type_desc);
+ case TYPE_BOOLEAN:
+ *val =
reinterpret_cast<AnyVal*>(context->allocate(sizeof(BooleanVal)));
+ new (*val) BooleanVal(iterator->GetBool());
+ break;
+ case TYPE_TINYINT:
+ *val =
reinterpret_cast<AnyVal*>(context->allocate(sizeof(TinyIntVal)));
+ new (*val) TinyIntVal(iterator->GetInt());
+ break;
+ case TYPE_SMALLINT:
+ *val =
reinterpret_cast<AnyVal*>(context->allocate(sizeof(SmallIntVal)));
+ new (*val) SmallIntVal(iterator->GetInt());
+ break;
+ case TYPE_INT:
+ *val =
reinterpret_cast<AnyVal*>(context->allocate(sizeof(IntVal)));
+ new (*val) IntVal(iterator->GetInt());
+ break;
+ case TYPE_BIGINT:
+ *val =
reinterpret_cast<AnyVal*>(context->allocate(sizeof(BigIntVal)));
+ new (*val) BigIntVal(iterator->GetInt64());
+ break;
+ case TYPE_CHAR:
+ case TYPE_VARCHAR:
+ case TYPE_STRING: {
+ *val =
reinterpret_cast<AnyVal*>(context->allocate(sizeof(StringVal)));
+ new (*val)
StringVal(context->allocate(iterator->GetStringLength()),
+ iterator->GetStringLength());
+ auto string_val = reinterpret_cast<StringVal*>(*val);
+ memory_copy(string_val->ptr, iterator->GetString(),
iterator->GetStringLength());
+ break;
+ }
+ default:
+ return Status::RuntimeError("Failed to parse json to type (" +
+ std::to_string(type_desc.type) + ").");
+ }
+ return Status::OK();
+ }
+
+private:
+ static std::unordered_map<FunctionContext::Type, PrimitiveType>
_types_mapping;
+};
+
+std::unordered_map<FunctionContext::Type, PrimitiveType>
ArrayParser::_types_mapping = {
+ {FunctionContext::INVALID_TYPE, PrimitiveType::INVALID_TYPE},
+ {FunctionContext::TYPE_NULL, PrimitiveType::TYPE_NULL},
+ {FunctionContext::TYPE_BOOLEAN, PrimitiveType::TYPE_BOOLEAN},
+ {FunctionContext::TYPE_TINYINT, PrimitiveType::TYPE_TINYINT},
+ {FunctionContext::TYPE_SMALLINT, PrimitiveType::TYPE_SMALLINT},
+ {FunctionContext::TYPE_INT, PrimitiveType::TYPE_INT},
+ {FunctionContext::TYPE_BIGINT, PrimitiveType::TYPE_BIGINT},
+ {FunctionContext::TYPE_LARGEINT, PrimitiveType::TYPE_LARGEINT},
+ {FunctionContext::TYPE_FLOAT, PrimitiveType::TYPE_FLOAT},
+ {FunctionContext::TYPE_DOUBLE, PrimitiveType::TYPE_DOUBLE},
+ {FunctionContext::TYPE_DECIMAL_DEPRACTED,
PrimitiveType::TYPE_DECIMAL_DEPRACTED},
+ {FunctionContext::TYPE_DATE, PrimitiveType::TYPE_DATE},
+ {FunctionContext::TYPE_DATETIME, PrimitiveType::TYPE_DATETIME},
+ {FunctionContext::TYPE_CHAR, PrimitiveType::TYPE_CHAR},
+ {FunctionContext::TYPE_VARCHAR, PrimitiveType::TYPE_VARCHAR},
+ {FunctionContext::TYPE_HLL, PrimitiveType::TYPE_HLL},
+ {FunctionContext::TYPE_STRING, PrimitiveType::TYPE_STRING},
+ {FunctionContext::TYPE_DECIMALV2, PrimitiveType::TYPE_DECIMALV2},
+ {FunctionContext::TYPE_OBJECT, PrimitiveType::TYPE_OBJECT},
+ {FunctionContext::TYPE_ARRAY, PrimitiveType::TYPE_ARRAY},
+};
+
+} // namespace doris
diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt
index b3ccb88..12daf38 100644
--- a/be/test/util/CMakeLists.txt
+++ b/be/test/util/CMakeLists.txt
@@ -75,5 +75,6 @@ ADD_BE_TEST(sort_heap_test)
ADD_BE_TEST(counts_test)
ADD_BE_TEST(date_func_test)
ADD_BE_TEST(tuple_row_zorder_compare_test)
+ADD_BE_TEST(array_parser_test)
target_link_libraries(Test_util Common Util Gutil ${Boost_LIBRARIES} glog
gflags fmt protobuf)
diff --git a/be/test/util/array_parser_test.cpp
b/be/test/util/array_parser_test.cpp
new file mode 100644
index 0000000..cbda494
--- /dev/null
+++ b/be/test/util/array_parser_test.cpp
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <string>
+#include <util/array_parser.hpp>
+
+#include "gutil/casts.h"
+#include "olap/types.h"
+#include "runtime/free_pool.hpp"
+#include "runtime/mem_pool.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/string_value.h"
+#include "udf/udf.h"
+#include "udf/udf_internal.h"
+
+namespace doris {
+
+using TypeDesc = FunctionContext::TypeDesc;
+
+template <typename... Ts>
+TypeDesc create_function_type_desc(FunctionContext::Type type, Ts...
sub_types) {
+ TypeDesc type_desc = {.type = type,
+ .len = (type == FunctionContext::TYPE_ARRAY) ?
OLAP_ARRAY_MAX_BYTES : 0};
+ if constexpr (sizeof...(sub_types)) {
+ type_desc.children.push_back(create_function_type_desc(sub_types...));
+ }
+ return type_desc;
+}
+
+ColumnPB create_column_pb(const TypeDesc& function_type_desc) {
+ ColumnPB column_pb;
+ column_pb.set_length(function_type_desc.len);
+ switch (function_type_desc.type) {
+ case FunctionContext::TYPE_ARRAY:
+ column_pb.set_type("ARRAY");
+ break;
+ case FunctionContext::TYPE_INT:
+ column_pb.set_type("INT");
+ break;
+ case FunctionContext::TYPE_VARCHAR:
+ column_pb.set_type("VARCHAR");
+ break;
+ default:
+ break;
+ }
+ for (auto child_type_desc : function_type_desc.children) {
+ auto sub_column_pb = create_column_pb(child_type_desc);
+ column_pb.add_children_columns()->Swap(&sub_column_pb);
+ }
+ return column_pb;
+}
+
+std::shared_ptr<const TypeInfo> get_type_info(const TypeDesc&
function_type_desc) {
+ auto column_pb = create_column_pb(function_type_desc);
+ TabletColumn tablet_column;
+ tablet_column.init_from_pb(column_pb);
+ return get_type_info(&tablet_column);
+}
+
+void test_array_parser(const TypeDesc& function_type_desc, const std::string&
json,
+ const CollectionValue& expect) {
+ MemTracker tracker(1024 * 1024, "ArrayParserTest");
+ MemPool mem_pool(&tracker);
+ std::unique_ptr<FunctionContext> function_context(new FunctionContext());
+ function_context->impl()->_return_type = function_type_desc;
+ function_context->impl()->_pool = new FreePool(&mem_pool);
+ CollectionVal collection_val;
+ auto status =
+ ArrayParser::parse(collection_val, function_context.get(),
StringVal(json.c_str()));
+ EXPECT_TRUE(status.ok());
+ auto actual = CollectionValue::from_collection_val(collection_val);
+ EXPECT_TRUE(get_type_info(function_type_desc)->equal(&expect, &actual));
+}
+
+TEST(ArrayParserTest, TestParseIntArray) {
+ auto function_type_desc =
+ create_function_type_desc(FunctionContext::TYPE_ARRAY,
FunctionContext::TYPE_INT);
+ test_array_parser(function_type_desc, "[]", CollectionValue(0));
+
+ int num_items = 3;
+ std::unique_ptr<int32_t[]> data(new int32_t[num_items] {1, 2, 3});
+ CollectionValue value(data.get(), num_items, false, nullptr);
+ test_array_parser(function_type_desc, "[1, 2, 3]", value);
+
+ std::unique_ptr<bool[]> null_signs(new bool[num_items] {false, true,
false});
+ value.set_has_null(true);
+ value.set_null_signs(null_signs.get());
+ test_array_parser(function_type_desc, "[1, null, 3]", value);
+}
+
+TEST(ArrayParserTest, TestParseVarcharArray) {
+ auto function_type_desc =
+ create_function_type_desc(FunctionContext::TYPE_ARRAY,
FunctionContext::TYPE_VARCHAR);
+ test_array_parser(function_type_desc, "[]", CollectionValue(0));
+
+ int num_items = 3;
+ std::unique_ptr<char[]> data(new char[num_items] {'a', 'b', 'c'});
+ std::unique_ptr<StringValue[]> string_values(new StringValue[num_items] {
+ {&data[0], 1},
+ {&data[1], 1},
+ {&data[2], 1},
+ });
+ CollectionValue value(string_values.get(), num_items, false, nullptr);
+ test_array_parser(function_type_desc, "[\"a\", \"b\", \"c\"]", value);
+
+ std::unique_ptr<bool[]> null_signs(new bool[num_items] {false, true,
false});
+ value.set_has_null(true);
+ value.set_null_signs(null_signs.get());
+ test_array_parser(function_type_desc, "[\"a\", null, \"c\"]", value);
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
index 387cf1a..c39b2b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CastExpr.java
@@ -241,12 +241,19 @@ public class CastExpr extends Expr {
this.opcode = TExprOpcode.CAST;
FunctionName fnName = new FunctionName(getFnName(type));
Function searchDesc = new Function(fnName,
Arrays.asList(collectChildReturnTypes()), Type.INVALID, false);
- if (isImplicit) {
- fn = Catalog.getCurrentCatalog().getFunction(
- searchDesc,
Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
- } else {
- fn = Catalog.getCurrentCatalog().getFunction(
- searchDesc, Function.CompareMode.IS_IDENTICAL);
+ if (type.isScalarType()) {
+ if (isImplicit) {
+ fn = Catalog.getCurrentCatalog().getFunction(
+ searchDesc,
Function.CompareMode.IS_NONSTRICT_SUPERTYPE_OF);
+ } else {
+ fn = Catalog.getCurrentCatalog().getFunction(
+ searchDesc, Function.CompareMode.IS_IDENTICAL);
+ }
+ } else if (type.isArrayType()){
+ fn = ScalarFunction.createBuiltin(getFnName(Type.ARRAY),
+ type, Function.NullableMode.ALWAYS_NULLABLE,
+ Lists.newArrayList(Type.VARCHAR), false ,
+ "doris::CastFunctions::cast_to_array_val", null, null,
true);
}
if (fn == null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
index bcb72d5..3f0b8d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
@@ -81,6 +81,7 @@ public abstract class Type {
// Only used for alias function, to represent any type in function args
public static final ScalarType ALL = new ScalarType(PrimitiveType.ALL);
public static final MapType Map = new MapType();
+ public static final ArrayType ARRAY = ArrayType.create();
private static ArrayList<ScalarType> integerTypes;
private static ArrayList<ScalarType> numericTypes;
@@ -123,7 +124,6 @@ public abstract class Type {
supportedTypes.add(DECIMALV2);
supportedTypes.add(TIME);
supportedTypes.add(STRING);
-
}
public static ArrayList<ScalarType> getIntegerTypes() {
@@ -387,7 +387,7 @@ public abstract class Type {
} else if (t1.isArrayType() && t2.isArrayType()) {
return ArrayType.canCastTo((ArrayType)t1, (ArrayType)t2);
}
- return t1.isNull();
+ return t1.isNull() || t1.getPrimitiveType() == PrimitiveType.VARCHAR;
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]