vibhatha commented on code in PR #13914:
URL: https://github.com/apache/arrow/pull/13914#discussion_r964457293
##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1814,5 +1920,1049 @@ TEST(Substrait, AggregateBadPhase) {
ASSERT_RAISES(NotImplemented, DeserializePlans(*buf, [] { return
kNullConsumer; }));
}
+TEST(Substrait, ProjectRel) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto dummy_schema =
+ schema({field("A", int32()), field("B", int32()), field("C", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto input_table = TableFromJSON(dummy_schema, {R"([
+ [1, 1, 10],
+ [3, 5, 20],
+ [4, 1, 30],
+ [2, 1, 40],
+ [5, 5, 50],
+ [2, 2, 60]
+ ])"});
+
+ std::string file_prefix = "serde_project_test";
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_project_tempdir"));
+
+ TempDataGenerator datagen(input_table, file_prefix, tempdir);
+ ASSERT_OK(datagen());
+ std::string substrait_file_uri = "file://" + datagen.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "project": {
+ "expressions": [{
+ "scalarFunction": {
+ "functionReference": 0,
+ "arguments": [{
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }, {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 1
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }],
+ "output_type": {
+ "bool": {}
+ }
+ }
+ },
+ ],
+ "input" : {
+ "read": {
+ "base_schema": {
+ "names": ["A", "B", "C"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }],
+ "extension_uris": [
+ {
+ "extension_uri_anchor": 0,
+ "uri": ")" + std::string(kSubstraitComparisonFunctionsUri) +
+ R"("
+ }
+ ],
+ "extensions": [
+ {"extension_function": {
+ "extension_uri_reference": 0,
+ "function_anchor": 0,
+ "name": "equal"
+ }}
+ ]
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema = schema({field("A", int32()), field("B", int32()),
+ field("C", int32()), field("equal",
boolean())});
+ auto expected_table = TableFromJSON(output_schema, {R"([
+ [1, 1, 10, true],
+ [3, 5, 20, false],
+ [4, 1, 30, false],
+ [2, 1, 40, false],
+ [5, 5, 50, true],
+ [2, 2, 60, true]
+ ])"});
+ EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf)();
+}
+
+TEST(Substrait, ProjectRelOnFunctionWithEmit) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto dummy_schema =
+ schema({field("A", int32()), field("B", int32()), field("C", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto input_table = TableFromJSON(dummy_schema, {R"([
+ [1, 1, 10],
+ [3, 5, 20],
+ [4, 1, 30],
+ [2, 1, 40],
+ [5, 5, 50],
+ [2, 2, 60]
+ ])"});
+
+ std::string file_prefix = "serde_project_emit_test";
+ ASSERT_OK_AND_ASSIGN(auto tempdir, arrow::internal::TemporaryDir::Make(
+ "substrait_project_emit_tempdir"));
+
+ TempDataGenerator datagen(input_table, file_prefix, tempdir);
+ ASSERT_OK(datagen());
+ std::string substrait_file_uri = "file://" + datagen.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "project": {
+ "common": {
+ "emit": {
+ "outputMapping": [0, 2, 3]
+ }
+ },
+ "expressions": [{
+ "scalarFunction": {
+ "functionReference": 0,
+ "arguments": [{
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }, {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 1
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }],
+ "output_type": {
+ "bool": {}
+ }
+ }
+ },
+ ],
+ "input" : {
+ "read": {
+ "base_schema": {
+ "names": ["A", "B", "C"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }],
+ "extension_uris": [
+ {
+ "extension_uri_anchor": 0,
+ "uri": ")" + std::string(kSubstraitComparisonFunctionsUri) +
+ R"("
+ }
+ ],
+ "extensions": [
+ {"extension_function": {
+ "extension_uri_reference": 0,
+ "function_anchor": 0,
+ "name": "equal"
+ }}
+ ]
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema =
+ schema({field("A", int32()), field("C", int32()), field("equal",
boolean())});
+ auto expected_table = TableFromJSON(output_schema, {R"([
+ [1, 10, true],
+ [3, 20, false],
+ [4, 30, false],
+ [2, 40, false],
+ [5, 50, true],
+ [2, 60, true]
+ ])"});
+ EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf)();
+}
+
+TEST(Substrait, ReadRelWithEmit) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto dummy_schema =
+ schema({field("A", int32()), field("B", int32()), field("C", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto input_table = TableFromJSON(dummy_schema, {R"([
+ [1, 1, 10],
+ [3, 4, 20]
+ ])"});
+
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_read_tempdir"));
+ std::string file_prefix = "serde_read_emit_test";
+
+ TempDataGenerator datagen(input_table, file_prefix, tempdir);
+ ASSERT_OK(datagen());
+ std::string substrait_file_uri = "file://" + datagen.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "read": {
+ "common": {
+ "emit": {
+ "outputMapping": [1, 2]
+ }
+ },
+ "base_schema": {
+ "names": ["A", "B", "C"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" + substrait_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ }
+ }],
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema = schema({field("B", int32()), field("C", int32())});
+ auto expected_table = TableFromJSON(output_schema, {R"([
+ [1, 10],
+ [4, 20]
+ ])"});
+ EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf)();
+}
+
+TEST(Substrait, FilterRelWithEmit) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto dummy_schema = schema({field("A", int32()), field("B", int32()),
+ field("C", int32()), field("D", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto input_table = TableFromJSON(dummy_schema, {R"([
+ [10, 1, 80, 7],
+ [20, 2, 70, 6],
+ [30, 3, 30, 5],
+ [40, 4, 20, 4],
+ [40, 5, 40, 3],
+ [20, 6, 20, 2],
+ [30, 7, 30, 1]
+ ])"});
+
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_read_tempdir"));
+ std::string file_prefix = "serde_read_emit_test";
+
+ TempDataGenerator datagen(input_table, file_prefix, tempdir);
+ ASSERT_OK(datagen());
+ std::string substrait_file_uri = "file://" + datagen.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "filter": {
+ "common": {
+ "emit": {
+ "outputMapping": [1, 3]
+ }
+ },
+ "condition": {
+ "scalarFunction": {
+ "functionReference": 0,
+ "arguments": [{
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }, {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 2
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }],
+ "output_type": {
+ "bool": {}
+ }
+ }
+ },
+ "input" : {
+ "read": {
+ "base_schema": {
+ "names": ["A", "B", "C", "D"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ },{
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }],
+ "extension_uris": [
+ {
+ "extension_uri_anchor": 0,
+ "uri": ")" + std::string(kSubstraitComparisonFunctionsUri) +
+ R"("
+ }
+ ],
+ "extensions": [
+ {"extension_function": {
+ "extension_uri_reference": 0,
+ "function_anchor": 0,
+ "name": "equal"
+ }}
+ ]
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema = schema({field("B", int32()), field("D", int32())});
+ auto expected_table = TableFromJSON(output_schema, {R"([
+ [3, 5],
+ [5, 3],
+ [6, 2],
+ [7, 1]
+ ])"});
+ EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf)();
+}
+
+TEST(Substrait, JoinRelEndToEnd) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto left_schema = schema({field("A", int32()), field("B", int32()),
+ field("C", int32()), field("D", int32())});
+
+ auto right_schema = schema({field("X", int32()), field("Y", int32()),
+ field("Z", int32()), field("W", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto left_table = TableFromJSON(left_schema, {R"([
+ [10, 1, 80, 70],
+ [20, 2, 70, 60],
+ [30, 3, 30, 50]
+ ])"});
+
+ auto right_table = TableFromJSON(right_schema, {R"([
+ [10, 1, 81, 71],
+ [80, 2, 71, 61],
+ [31, 3, 31, 51]
+ ])"});
+
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_join_tempdir"));
+ std::string left_file_prefix = "serde_join_left_emit_test";
+ std::string right_file_prefix = "serde_join_right_emit_test";
+
+ TempDataGenerator datagen_left(left_table, left_file_prefix, tempdir);
+ ASSERT_OK(datagen_left());
+ std::string substrait_left_file_uri = "file://" +
datagen_left.data_file_path;
+
+ TempDataGenerator datagen_right(right_table, right_file_prefix, tempdir);
+ ASSERT_OK(datagen_right());
+ std::string substrait_right_file_uri = "file://" +
datagen_right.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "join": {
+ "left": {
+ "read": {
+ "base_schema": {
+ "names": ["A", "B", "C", "D"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_left_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ },
+ "right": {
+ "read": {
+ "base_schema": {
+ "names": ["X", "Y", "Z", "W"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_right_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ },
+ "expression": {
+ "scalarFunction": {
+ "functionReference": 0,
+ "arguments": [{
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }, {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }],
+ "output_type": {
+ "bool": {}
+ }
+ }
+ },
+ "type": "JOIN_TYPE_INNER"
+ }
+ }
+ }],
+ "extension_uris": [
+ {
+ "extension_uri_anchor": 0,
+ "uri": ")" + std::string(kSubstraitComparisonFunctionsUri) +
+ R"("
+ }
+ ],
+ "extensions": [
+ {"extension_function": {
+ "extension_uri_reference": 0,
+ "function_anchor": 0,
+ "name": "equal"
+ }}
+ ]
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema = schema({
+ field("A", int32()),
+ field("B", int32()),
+ field("C", int32()),
+ field("D", int32()),
+ field("__fragment_index_l", int32()),
+ field("__batch_index_l", int32()),
+ field("__last_in_fragment_l", boolean()),
+ field("__filename_l", utf8()),
+ field("X", int32()),
+ field("Y", int32()),
+ field("Z", int32()),
+ field("W", int32()),
+ field("__fragment_index_r", int32()),
+ field("__batch_index_r", int32()),
+ field("__last_in_fragment_r", boolean()),
+ field("__filename_r", utf8()),
+ });
+
+ // include these columns for comparison
+ std::vector<int> include_columns{0, 1, 2, 3, 8, 9, 10, 11};
+ auto compared_output_schema = schema({
+ field("A", int32()),
+ field("B", int32()),
+ field("C", int32()),
+ field("D", int32()),
+ field("X", int32()),
+ field("Y", int32()),
+ field("Z", int32()),
+ field("W", int32()),
+ });
+ auto expected_table = TableFromJSON(std::move(compared_output_schema), {R"([
+ [10, 1, 80, 70, 10, 1, 81, 71]
+ ])"});
+ EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf,
+ std::move(include_columns))();
+}
+
+TEST(Substrait, JoinRelWithEmit) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto left_schema = schema({field("A", int32()), field("B", int32()),
+ field("C", int32()), field("D", int32())});
+
+ auto right_schema = schema({field("X", int32()), field("Y", int32()),
+ field("Z", int32()), field("W", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto left_table = TableFromJSON(left_schema, {R"([
+ [10, 1, 80, 70],
+ [20, 2, 70, 60],
+ [30, 3, 30, 50]
+ ])"});
+
+ auto right_table = TableFromJSON(right_schema, {R"([
+ [10, 1, 81, 71],
+ [80, 2, 71, 61],
+ [31, 3, 31, 51]
+ ])"});
+
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_join_tempdir"));
+ std::string left_file_prefix = "serde_join_left_emit_test";
+ std::string right_file_prefix = "serde_join_right_emit_test";
+
+ TempDataGenerator datagen_left(left_table, left_file_prefix, tempdir);
+ ASSERT_OK(datagen_left());
+ std::string substrait_left_file_uri = "file://" +
datagen_left.data_file_path;
+
+ TempDataGenerator datagen_right(right_table, right_file_prefix, tempdir);
+ ASSERT_OK(datagen_right());
+ std::string substrait_right_file_uri = "file://" +
datagen_right.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "join": {
+ "common": {
+ "emit": {
+ "outputMapping": [0, 1, 2, 3, 8, 9, 10, 11]
+ }
+ },
+ "left": {
+ "read": {
+ "base_schema": {
+ "names": ["A", "B", "C", "D"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_left_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ },
+ "right": {
+ "read": {
+ "base_schema": {
+ "names": ["X", "Y", "Z", "W"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_right_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ },
+ "expression": {
+ "scalarFunction": {
+ "functionReference": 0,
+ "arguments": [{
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }, {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }],
+ "output_type": {
+ "bool": {}
+ }
+ }
+ },
+ "type": "JOIN_TYPE_INNER"
+ }
+ }
+ }],
+ "extension_uris": [
+ {
+ "extension_uri_anchor": 0,
+ "uri": ")" + std::string(kSubstraitComparisonFunctionsUri) +
+ R"("
+ }
+ ],
+ "extensions": [
+ {"extension_function": {
+ "extension_uri_reference": 0,
+ "function_anchor": 0,
+ "name": "equal"
+ }}
+ ]
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema = schema({
+ field("A", int32()),
+ field("B", int32()),
+ field("C", int32()),
+ field("D", int32()),
+ field("X", int32()),
+ field("Y", int32()),
+ field("Z", int32()),
+ field("W", int32()),
+ });
Review Comment:
Done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]