westonpace commented on code in PR #13914:
URL: https://github.com/apache/arrow/pull/13914#discussion_r959848046
##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -121,6 +155,78 @@ inline compute::Expression UseBoringRefs(const
compute::Expression& expr) {
return compute::Expression{std::move(modified_call)};
}
+// TODO: complete this interface
+struct TempDataGenerator {
+ TempDataGenerator(const std::shared_ptr<Table> input_table,
+ const std::string& file_prefix,
+ std::unique_ptr<arrow::internal::TemporaryDir>& tempdir)
+ : input_table(input_table), file_prefix(file_prefix), tempdir(tempdir) {}
+
+ Status operator()() {
+ auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+ auto filesystem = std::make_shared<fs::LocalFileSystem>();
+
+ const std::string file_name = file_prefix + ".parquet";
+
+ ARROW_ASSIGN_OR_RAISE(auto file_path, tempdir->path().Join(file_name));
+ data_file_path = file_path.ToString();
+
+ std::string toReplace("/T//");
+ size_t pos = data_file_path.find(toReplace);
+ data_file_path.replace(pos, toReplace.length(), "/T/");
+
+ ARROW_EXPECT_OK(WriteParquetData(data_file_path, filesystem, input_table));
+ return Status::OK();
+ }
+
+ std::shared_ptr<Table> input_table;
+ std::string file_prefix;
+ std::unique_ptr<arrow::internal::TemporaryDir>& tempdir;
+ std::string data_file_path;
+};
+
+struct EmitValidate {
+ EmitValidate(const std::shared_ptr<Schema> output_schema,
+ const std::shared_ptr<Table> expected_table,
+ compute::ExecContext& exec_context, std::shared_ptr<Buffer>&
buf,
+ const std::vector<int>& include_columns = {})
+ : output_schema(output_schema),
+ expected_table(expected_table),
+ exec_context(exec_context),
+ buf(buf),
+ include_columns(include_columns) {}
+ void operator()() {
+ for (auto sp_ext_id_reg :
+ {std::shared_ptr<ExtensionIdRegistry>(), MakeExtensionIdRegistry()}) {
Review Comment:
Why are we testing two different registries here? Is there a reason to
expect the extension ID registry to affect emit behavior?
##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -121,6 +155,78 @@ inline compute::Expression UseBoringRefs(const
compute::Expression& expr) {
return compute::Expression{std::move(modified_call)};
}
+// TODO: complete this interface
+struct TempDataGenerator {
+ TempDataGenerator(const std::shared_ptr<Table> input_table,
+ const std::string& file_prefix,
+ std::unique_ptr<arrow::internal::TemporaryDir>& tempdir)
+ : input_table(input_table), file_prefix(file_prefix), tempdir(tempdir) {}
+
+ Status operator()() {
+ auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+ auto filesystem = std::make_shared<fs::LocalFileSystem>();
+
+ const std::string file_name = file_prefix + ".parquet";
+
+ ARROW_ASSIGN_OR_RAISE(auto file_path, tempdir->path().Join(file_name));
+ data_file_path = file_path.ToString();
+
+ std::string toReplace("/T//");
+ size_t pos = data_file_path.find(toReplace);
+ data_file_path.replace(pos, toReplace.length(), "/T/");
+
+ ARROW_EXPECT_OK(WriteParquetData(data_file_path, filesystem, input_table));
+ return Status::OK();
+ }
+
+ std::shared_ptr<Table> input_table;
+ std::string file_prefix;
+ std::unique_ptr<arrow::internal::TemporaryDir>& tempdir;
+ std::string data_file_path;
+};
+
+struct EmitValidate {
+ EmitValidate(const std::shared_ptr<Schema> output_schema,
Review Comment:
Regarding the naming, I think `CheckEmit` or `EmitRoundTrip` would be more
consistent with naming we have elsewhere. That being said, are you really
validating the emit here? It looks like you are mostly running the plan and
ensuring the resulting table equals the expected table. That seems like a more
generic capability that isn't necessarily related to emit (though could
certainly be used in emit testing).
##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1814,5 +1920,1049 @@ TEST(Substrait, AggregateBadPhase) {
ASSERT_RAISES(NotImplemented, DeserializePlans(*buf, [] { return
kNullConsumer; }));
}
+TEST(Substrait, ProjectRel) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto dummy_schema =
+ schema({field("A", int32()), field("B", int32()), field("C", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto input_table = TableFromJSON(dummy_schema, {R"([
+ [1, 1, 10],
+ [3, 5, 20],
+ [4, 1, 30],
+ [2, 1, 40],
+ [5, 5, 50],
+ [2, 2, 60]
+ ])"});
+
+ std::string file_prefix = "serde_project_test";
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_project_tempdir"));
Review Comment:
Given that you are creating the input table like this can you use a named
table provider and avoid writing the table to a temporary file. That should
help cut out some of the complexity.
##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -121,6 +155,78 @@ inline compute::Expression UseBoringRefs(const
compute::Expression& expr) {
return compute::Expression{std::move(modified_call)};
}
+// TODO: complete this interface
+struct TempDataGenerator {
+ TempDataGenerator(const std::shared_ptr<Table> input_table,
+ const std::string& file_prefix,
+ std::unique_ptr<arrow::internal::TemporaryDir>& tempdir)
+ : input_table(input_table), file_prefix(file_prefix), tempdir(tempdir) {}
+
+ Status operator()() {
+ auto format = std::make_shared<arrow::dataset::ParquetFileFormat>();
+ auto filesystem = std::make_shared<fs::LocalFileSystem>();
+
+ const std::string file_name = file_prefix + ".parquet";
+
+ ARROW_ASSIGN_OR_RAISE(auto file_path, tempdir->path().Join(file_name));
+ data_file_path = file_path.ToString();
+
+ std::string toReplace("/T//");
+ size_t pos = data_file_path.find(toReplace);
+ data_file_path.replace(pos, toReplace.length(), "/T/");
+
+ ARROW_EXPECT_OK(WriteParquetData(data_file_path, filesystem, input_table));
+ return Status::OK();
+ }
+
+ std::shared_ptr<Table> input_table;
+ std::string file_prefix;
+ std::unique_ptr<arrow::internal::TemporaryDir>& tempdir;
+ std::string data_file_path;
+};
+
+struct EmitValidate {
Review Comment:
Looking at the usage I think this would be better as a function than a
callable struct...
```
void EmitValidate(const std::shared_ptr<Schema> output_schema, const
std::shared_ptr<Table> expected_table, compute::ExecContext& exec_context,
std::shared_ptr<Buffer>& buf, const std::vector<int>& include_columns = {}) {
for (auto sp_ext_id_reg : {std::shared_ptr<ExtensionIdRegistry>(),
MakeExtensionIdRegistry()}) {
// ...
}
```
Then to call it...
```
EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf);
```
##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1814,5 +1920,1049 @@ TEST(Substrait, AggregateBadPhase) {
ASSERT_RAISES(NotImplemented, DeserializePlans(*buf, [] { return
kNullConsumer; }));
}
+TEST(Substrait, ProjectRel) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto dummy_schema =
+ schema({field("A", int32()), field("B", int32()), field("C", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto input_table = TableFromJSON(dummy_schema, {R"([
+ [1, 1, 10],
+ [3, 5, 20],
+ [4, 1, 30],
+ [2, 1, 40],
+ [5, 5, 50],
+ [2, 2, 60]
+ ])"});
+
+ std::string file_prefix = "serde_project_test";
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_project_tempdir"));
+
+ TempDataGenerator datagen(input_table, file_prefix, tempdir);
+ ASSERT_OK(datagen());
+ std::string substrait_file_uri = "file://" + datagen.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "project": {
+ "expressions": [{
+ "scalarFunction": {
+ "functionReference": 0,
+ "arguments": [{
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }, {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 1
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }],
+ "output_type": {
+ "bool": {}
+ }
+ }
+ },
+ ],
+ "input" : {
+ "read": {
+ "base_schema": {
+ "names": ["A", "B", "C"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }],
+ "extension_uris": [
+ {
+ "extension_uri_anchor": 0,
+ "uri": ")" + std::string(kSubstraitComparisonFunctionsUri) +
+ R"("
+ }
+ ],
+ "extensions": [
+ {"extension_function": {
+ "extension_uri_reference": 0,
+ "function_anchor": 0,
+ "name": "equal"
+ }}
+ ]
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema = schema({field("A", int32()), field("B", int32()),
+ field("C", int32()), field("equal",
boolean())});
+ auto expected_table = TableFromJSON(output_schema, {R"([
+ [1, 1, 10, true],
+ [3, 5, 20, false],
+ [4, 1, 30, false],
+ [2, 1, 40, false],
+ [5, 5, 50, true],
+ [2, 2, 60, true]
+ ])"});
+ EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf)();
Review Comment:
To echo my point above regarding the naming of this function, this plan
doesn't even contain an emit so calling something named `EmitValidate` seems a
bit odd.
##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1814,5 +1920,1049 @@ TEST(Substrait, AggregateBadPhase) {
ASSERT_RAISES(NotImplemented, DeserializePlans(*buf, [] { return
kNullConsumer; }));
}
+TEST(Substrait, ProjectRel) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto dummy_schema =
+ schema({field("A", int32()), field("B", int32()), field("C", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto input_table = TableFromJSON(dummy_schema, {R"([
+ [1, 1, 10],
+ [3, 5, 20],
+ [4, 1, 30],
+ [2, 1, 40],
+ [5, 5, 50],
+ [2, 2, 60]
+ ])"});
+
+ std::string file_prefix = "serde_project_test";
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_project_tempdir"));
+
+ TempDataGenerator datagen(input_table, file_prefix, tempdir);
+ ASSERT_OK(datagen());
+ std::string substrait_file_uri = "file://" + datagen.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "project": {
+ "expressions": [{
+ "scalarFunction": {
+ "functionReference": 0,
+ "arguments": [{
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }, {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 1
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }],
+ "output_type": {
+ "bool": {}
+ }
+ }
+ },
+ ],
+ "input" : {
+ "read": {
+ "base_schema": {
+ "names": ["A", "B", "C"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }],
+ "extension_uris": [
+ {
+ "extension_uri_anchor": 0,
+ "uri": ")" + std::string(kSubstraitComparisonFunctionsUri) +
+ R"("
+ }
+ ],
+ "extensions": [
+ {"extension_function": {
+ "extension_uri_reference": 0,
+ "function_anchor": 0,
+ "name": "equal"
+ }}
+ ]
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema = schema({field("A", int32()), field("B", int32()),
+ field("C", int32()), field("equal",
boolean())});
+ auto expected_table = TableFromJSON(output_schema, {R"([
+ [1, 1, 10, true],
+ [3, 5, 20, false],
+ [4, 1, 30, false],
+ [2, 1, 40, false],
+ [5, 5, 50, true],
+ [2, 2, 60, true]
+ ])"});
+ EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf)();
+}
+
+TEST(Substrait, ProjectRelOnFunctionWithEmit) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto dummy_schema =
+ schema({field("A", int32()), field("B", int32()), field("C", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto input_table = TableFromJSON(dummy_schema, {R"([
+ [1, 1, 10],
+ [3, 5, 20],
+ [4, 1, 30],
+ [2, 1, 40],
+ [5, 5, 50],
+ [2, 2, 60]
+ ])"});
+
+ std::string file_prefix = "serde_project_emit_test";
+ ASSERT_OK_AND_ASSIGN(auto tempdir, arrow::internal::TemporaryDir::Make(
+ "substrait_project_emit_tempdir"));
+
+ TempDataGenerator datagen(input_table, file_prefix, tempdir);
+ ASSERT_OK(datagen());
+ std::string substrait_file_uri = "file://" + datagen.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "project": {
+ "common": {
+ "emit": {
+ "outputMapping": [0, 2, 3]
+ }
+ },
+ "expressions": [{
+ "scalarFunction": {
+ "functionReference": 0,
+ "arguments": [{
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }, {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 1
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }],
+ "output_type": {
+ "bool": {}
+ }
+ }
+ },
+ ],
+ "input" : {
+ "read": {
+ "base_schema": {
+ "names": ["A", "B", "C"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }],
+ "extension_uris": [
+ {
+ "extension_uri_anchor": 0,
+ "uri": ")" + std::string(kSubstraitComparisonFunctionsUri) +
+ R"("
+ }
+ ],
+ "extensions": [
+ {"extension_function": {
+ "extension_uri_reference": 0,
+ "function_anchor": 0,
+ "name": "equal"
+ }}
+ ]
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema =
+ schema({field("A", int32()), field("C", int32()), field("equal",
boolean())});
+ auto expected_table = TableFromJSON(output_schema, {R"([
+ [1, 10, true],
+ [3, 20, false],
+ [4, 30, false],
+ [2, 40, false],
+ [5, 50, true],
+ [2, 60, true]
+ ])"});
+ EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf)();
+}
+
+TEST(Substrait, ReadRelWithEmit) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto dummy_schema =
+ schema({field("A", int32()), field("B", int32()), field("C", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto input_table = TableFromJSON(dummy_schema, {R"([
+ [1, 1, 10],
+ [3, 4, 20]
+ ])"});
+
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_read_tempdir"));
+ std::string file_prefix = "serde_read_emit_test";
+
+ TempDataGenerator datagen(input_table, file_prefix, tempdir);
+ ASSERT_OK(datagen());
+ std::string substrait_file_uri = "file://" + datagen.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "read": {
+ "common": {
+ "emit": {
+ "outputMapping": [1, 2]
+ }
+ },
+ "base_schema": {
+ "names": ["A", "B", "C"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" + substrait_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ }
+ }],
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema = schema({field("B", int32()), field("C", int32())});
+ auto expected_table = TableFromJSON(output_schema, {R"([
+ [1, 10],
+ [4, 20]
+ ])"});
+ EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf)();
+}
+
+TEST(Substrait, FilterRelWithEmit) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto dummy_schema = schema({field("A", int32()), field("B", int32()),
+ field("C", int32()), field("D", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto input_table = TableFromJSON(dummy_schema, {R"([
+ [10, 1, 80, 7],
+ [20, 2, 70, 6],
+ [30, 3, 30, 5],
+ [40, 4, 20, 4],
+ [40, 5, 40, 3],
+ [20, 6, 20, 2],
+ [30, 7, 30, 1]
+ ])"});
+
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_read_tempdir"));
+ std::string file_prefix = "serde_read_emit_test";
+
+ TempDataGenerator datagen(input_table, file_prefix, tempdir);
+ ASSERT_OK(datagen());
+ std::string substrait_file_uri = "file://" + datagen.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "filter": {
+ "common": {
+ "emit": {
+ "outputMapping": [1, 3]
+ }
+ },
+ "condition": {
+ "scalarFunction": {
+ "functionReference": 0,
+ "arguments": [{
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }, {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 2
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }],
+ "output_type": {
+ "bool": {}
+ }
+ }
+ },
+ "input" : {
+ "read": {
+ "base_schema": {
+ "names": ["A", "B", "C", "D"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ },{
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }],
+ "extension_uris": [
+ {
+ "extension_uri_anchor": 0,
+ "uri": ")" + std::string(kSubstraitComparisonFunctionsUri) +
+ R"("
+ }
+ ],
+ "extensions": [
+ {"extension_function": {
+ "extension_uri_reference": 0,
+ "function_anchor": 0,
+ "name": "equal"
+ }}
+ ]
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema = schema({field("B", int32()), field("D", int32())});
+ auto expected_table = TableFromJSON(output_schema, {R"([
+ [3, 5],
+ [5, 3],
+ [6, 2],
+ [7, 1]
+ ])"});
+ EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf)();
+}
+
+TEST(Substrait, JoinRelEndToEnd) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto left_schema = schema({field("A", int32()), field("B", int32()),
+ field("C", int32()), field("D", int32())});
+
+ auto right_schema = schema({field("X", int32()), field("Y", int32()),
+ field("Z", int32()), field("W", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto left_table = TableFromJSON(left_schema, {R"([
+ [10, 1, 80, 70],
+ [20, 2, 70, 60],
+ [30, 3, 30, 50]
+ ])"});
+
+ auto right_table = TableFromJSON(right_schema, {R"([
+ [10, 1, 81, 71],
+ [80, 2, 71, 61],
+ [31, 3, 31, 51]
+ ])"});
+
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_join_tempdir"));
+ std::string left_file_prefix = "serde_join_left_emit_test";
+ std::string right_file_prefix = "serde_join_right_emit_test";
+
+ TempDataGenerator datagen_left(left_table, left_file_prefix, tempdir);
+ ASSERT_OK(datagen_left());
+ std::string substrait_left_file_uri = "file://" +
datagen_left.data_file_path;
+
+ TempDataGenerator datagen_right(right_table, right_file_prefix, tempdir);
+ ASSERT_OK(datagen_right());
+ std::string substrait_right_file_uri = "file://" +
datagen_right.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "join": {
+ "left": {
+ "read": {
+ "base_schema": {
+ "names": ["A", "B", "C", "D"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_left_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ },
+ "right": {
+ "read": {
+ "base_schema": {
+ "names": ["X", "Y", "Z", "W"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_right_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ },
+ "expression": {
+ "scalarFunction": {
+ "functionReference": 0,
+ "arguments": [{
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }, {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }],
+ "output_type": {
+ "bool": {}
+ }
+ }
+ },
+ "type": "JOIN_TYPE_INNER"
+ }
+ }
+ }],
+ "extension_uris": [
+ {
+ "extension_uri_anchor": 0,
+ "uri": ")" + std::string(kSubstraitComparisonFunctionsUri) +
+ R"("
+ }
+ ],
+ "extensions": [
+ {"extension_function": {
+ "extension_uri_reference": 0,
+ "function_anchor": 0,
+ "name": "equal"
+ }}
+ ]
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema = schema({
+ field("A", int32()),
+ field("B", int32()),
+ field("C", int32()),
+ field("D", int32()),
+ field("__fragment_index_l", int32()),
+ field("__batch_index_l", int32()),
+ field("__last_in_fragment_l", boolean()),
+ field("__filename_l", utf8()),
Review Comment:
If we use named table providers instead of files these extra fields will go
away which will be nice.
##########
cpp/src/arrow/engine/substrait/serde_test.cc:
##########
@@ -1814,5 +1920,1049 @@ TEST(Substrait, AggregateBadPhase) {
ASSERT_RAISES(NotImplemented, DeserializePlans(*buf, [] { return
kNullConsumer; }));
}
+TEST(Substrait, ProjectRel) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto dummy_schema =
+ schema({field("A", int32()), field("B", int32()), field("C", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto input_table = TableFromJSON(dummy_schema, {R"([
+ [1, 1, 10],
+ [3, 5, 20],
+ [4, 1, 30],
+ [2, 1, 40],
+ [5, 5, 50],
+ [2, 2, 60]
+ ])"});
+
+ std::string file_prefix = "serde_project_test";
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_project_tempdir"));
+
+ TempDataGenerator datagen(input_table, file_prefix, tempdir);
+ ASSERT_OK(datagen());
+ std::string substrait_file_uri = "file://" + datagen.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "project": {
+ "expressions": [{
+ "scalarFunction": {
+ "functionReference": 0,
+ "arguments": [{
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }, {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 1
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }],
+ "output_type": {
+ "bool": {}
+ }
+ }
+ },
+ ],
+ "input" : {
+ "read": {
+ "base_schema": {
+ "names": ["A", "B", "C"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }],
+ "extension_uris": [
+ {
+ "extension_uri_anchor": 0,
+ "uri": ")" + std::string(kSubstraitComparisonFunctionsUri) +
+ R"("
+ }
+ ],
+ "extensions": [
+ {"extension_function": {
+ "extension_uri_reference": 0,
+ "function_anchor": 0,
+ "name": "equal"
+ }}
+ ]
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema = schema({field("A", int32()), field("B", int32()),
+ field("C", int32()), field("equal",
boolean())});
+ auto expected_table = TableFromJSON(output_schema, {R"([
+ [1, 1, 10, true],
+ [3, 5, 20, false],
+ [4, 1, 30, false],
+ [2, 1, 40, false],
+ [5, 5, 50, true],
+ [2, 2, 60, true]
+ ])"});
+ EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf)();
+}
+
+TEST(Substrait, ProjectRelOnFunctionWithEmit) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto dummy_schema =
+ schema({field("A", int32()), field("B", int32()), field("C", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto input_table = TableFromJSON(dummy_schema, {R"([
+ [1, 1, 10],
+ [3, 5, 20],
+ [4, 1, 30],
+ [2, 1, 40],
+ [5, 5, 50],
+ [2, 2, 60]
+ ])"});
+
+ std::string file_prefix = "serde_project_emit_test";
+ ASSERT_OK_AND_ASSIGN(auto tempdir, arrow::internal::TemporaryDir::Make(
+ "substrait_project_emit_tempdir"));
+
+ TempDataGenerator datagen(input_table, file_prefix, tempdir);
+ ASSERT_OK(datagen());
+ std::string substrait_file_uri = "file://" + datagen.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "project": {
+ "common": {
+ "emit": {
+ "outputMapping": [0, 2, 3]
+ }
+ },
+ "expressions": [{
+ "scalarFunction": {
+ "functionReference": 0,
+ "arguments": [{
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }, {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 1
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }],
+ "output_type": {
+ "bool": {}
+ }
+ }
+ },
+ ],
+ "input" : {
+ "read": {
+ "base_schema": {
+ "names": ["A", "B", "C"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }],
+ "extension_uris": [
+ {
+ "extension_uri_anchor": 0,
+ "uri": ")" + std::string(kSubstraitComparisonFunctionsUri) +
+ R"("
+ }
+ ],
+ "extensions": [
+ {"extension_function": {
+ "extension_uri_reference": 0,
+ "function_anchor": 0,
+ "name": "equal"
+ }}
+ ]
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema =
+ schema({field("A", int32()), field("C", int32()), field("equal",
boolean())});
+ auto expected_table = TableFromJSON(output_schema, {R"([
+ [1, 10, true],
+ [3, 20, false],
+ [4, 30, false],
+ [2, 40, false],
+ [5, 50, true],
+ [2, 60, true]
+ ])"});
+ EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf)();
+}
+
+TEST(Substrait, ReadRelWithEmit) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto dummy_schema =
+ schema({field("A", int32()), field("B", int32()), field("C", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto input_table = TableFromJSON(dummy_schema, {R"([
+ [1, 1, 10],
+ [3, 4, 20]
+ ])"});
+
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_read_tempdir"));
+ std::string file_prefix = "serde_read_emit_test";
+
+ TempDataGenerator datagen(input_table, file_prefix, tempdir);
+ ASSERT_OK(datagen());
+ std::string substrait_file_uri = "file://" + datagen.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "read": {
+ "common": {
+ "emit": {
+ "outputMapping": [1, 2]
+ }
+ },
+ "base_schema": {
+ "names": ["A", "B", "C"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" + substrait_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ }
+ }],
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema = schema({field("B", int32()), field("C", int32())});
+ auto expected_table = TableFromJSON(output_schema, {R"([
+ [1, 10],
+ [4, 20]
+ ])"});
+ EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf)();
+}
+
+TEST(Substrait, FilterRelWithEmit) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto dummy_schema = schema({field("A", int32()), field("B", int32()),
+ field("C", int32()), field("D", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto input_table = TableFromJSON(dummy_schema, {R"([
+ [10, 1, 80, 7],
+ [20, 2, 70, 6],
+ [30, 3, 30, 5],
+ [40, 4, 20, 4],
+ [40, 5, 40, 3],
+ [20, 6, 20, 2],
+ [30, 7, 30, 1]
+ ])"});
+
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_read_tempdir"));
+ std::string file_prefix = "serde_read_emit_test";
+
+ TempDataGenerator datagen(input_table, file_prefix, tempdir);
+ ASSERT_OK(datagen());
+ std::string substrait_file_uri = "file://" + datagen.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "filter": {
+ "common": {
+ "emit": {
+ "outputMapping": [1, 3]
+ }
+ },
+ "condition": {
+ "scalarFunction": {
+ "functionReference": 0,
+ "arguments": [{
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }, {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 2
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }],
+ "output_type": {
+ "bool": {}
+ }
+ }
+ },
+ "input" : {
+ "read": {
+ "base_schema": {
+ "names": ["A", "B", "C", "D"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ },{
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ }
+ }
+ }
+ }],
+ "extension_uris": [
+ {
+ "extension_uri_anchor": 0,
+ "uri": ")" + std::string(kSubstraitComparisonFunctionsUri) +
+ R"("
+ }
+ ],
+ "extensions": [
+ {"extension_function": {
+ "extension_uri_reference": 0,
+ "function_anchor": 0,
+ "name": "equal"
+ }}
+ ]
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema = schema({field("B", int32()), field("D", int32())});
+ auto expected_table = TableFromJSON(output_schema, {R"([
+ [3, 5],
+ [5, 3],
+ [6, 2],
+ [7, 1]
+ ])"});
+ EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf)();
+}
+
+TEST(Substrait, JoinRelEndToEnd) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto left_schema = schema({field("A", int32()), field("B", int32()),
+ field("C", int32()), field("D", int32())});
+
+ auto right_schema = schema({field("X", int32()), field("Y", int32()),
+ field("Z", int32()), field("W", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto left_table = TableFromJSON(left_schema, {R"([
+ [10, 1, 80, 70],
+ [20, 2, 70, 60],
+ [30, 3, 30, 50]
+ ])"});
+
+ auto right_table = TableFromJSON(right_schema, {R"([
+ [10, 1, 81, 71],
+ [80, 2, 71, 61],
+ [31, 3, 31, 51]
+ ])"});
+
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_join_tempdir"));
+ std::string left_file_prefix = "serde_join_left_emit_test";
+ std::string right_file_prefix = "serde_join_right_emit_test";
+
+ TempDataGenerator datagen_left(left_table, left_file_prefix, tempdir);
+ ASSERT_OK(datagen_left());
+ std::string substrait_left_file_uri = "file://" +
datagen_left.data_file_path;
+
+ TempDataGenerator datagen_right(right_table, right_file_prefix, tempdir);
+ ASSERT_OK(datagen_right());
+ std::string substrait_right_file_uri = "file://" +
datagen_right.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "join": {
+ "left": {
+ "read": {
+ "base_schema": {
+ "names": ["A", "B", "C", "D"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_left_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ },
+ "right": {
+ "read": {
+ "base_schema": {
+ "names": ["X", "Y", "Z", "W"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_right_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ },
+ "expression": {
+ "scalarFunction": {
+ "functionReference": 0,
+ "arguments": [{
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }, {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }],
+ "output_type": {
+ "bool": {}
+ }
+ }
+ },
+ "type": "JOIN_TYPE_INNER"
+ }
+ }
+ }],
+ "extension_uris": [
+ {
+ "extension_uri_anchor": 0,
+ "uri": ")" + std::string(kSubstraitComparisonFunctionsUri) +
+ R"("
+ }
+ ],
+ "extensions": [
+ {"extension_function": {
+ "extension_uri_reference": 0,
+ "function_anchor": 0,
+ "name": "equal"
+ }}
+ ]
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema = schema({
+ field("A", int32()),
+ field("B", int32()),
+ field("C", int32()),
+ field("D", int32()),
+ field("__fragment_index_l", int32()),
+ field("__batch_index_l", int32()),
+ field("__last_in_fragment_l", boolean()),
+ field("__filename_l", utf8()),
+ field("X", int32()),
+ field("Y", int32()),
+ field("Z", int32()),
+ field("W", int32()),
+ field("__fragment_index_r", int32()),
+ field("__batch_index_r", int32()),
+ field("__last_in_fragment_r", boolean()),
+ field("__filename_r", utf8()),
+ });
+
+ // include these columns for comparison
+ std::vector<int> include_columns{0, 1, 2, 3, 8, 9, 10, 11};
+ auto compared_output_schema = schema({
+ field("A", int32()),
+ field("B", int32()),
+ field("C", int32()),
+ field("D", int32()),
+ field("X", int32()),
+ field("Y", int32()),
+ field("Z", int32()),
+ field("W", int32()),
+ });
+ auto expected_table = TableFromJSON(std::move(compared_output_schema), {R"([
+ [10, 1, 80, 70, 10, 1, 81, 71]
+ ])"});
+ EmitValidate(std::move(output_schema), std::move(expected_table),
exec_context, buf,
+ std::move(include_columns))();
+}
+
+TEST(Substrait, JoinRelWithEmit) {
+#ifdef _WIN32
+ GTEST_SKIP() << "ARROW-16392: Substrait File URI not supported for Windows";
+#endif
+ compute::ExecContext exec_context;
+ auto left_schema = schema({field("A", int32()), field("B", int32()),
+ field("C", int32()), field("D", int32())});
+
+ auto right_schema = schema({field("X", int32()), field("Y", int32()),
+ field("Z", int32()), field("W", int32())});
+
+ // creating a dummy dataset using a dummy table
+ auto left_table = TableFromJSON(left_schema, {R"([
+ [10, 1, 80, 70],
+ [20, 2, 70, 60],
+ [30, 3, 30, 50]
+ ])"});
+
+ auto right_table = TableFromJSON(right_schema, {R"([
+ [10, 1, 81, 71],
+ [80, 2, 71, 61],
+ [31, 3, 31, 51]
+ ])"});
+
+ ASSERT_OK_AND_ASSIGN(auto tempdir,
+
arrow::internal::TemporaryDir::Make("substrait_join_tempdir"));
+ std::string left_file_prefix = "serde_join_left_emit_test";
+ std::string right_file_prefix = "serde_join_right_emit_test";
+
+ TempDataGenerator datagen_left(left_table, left_file_prefix, tempdir);
+ ASSERT_OK(datagen_left());
+ std::string substrait_left_file_uri = "file://" +
datagen_left.data_file_path;
+
+ TempDataGenerator datagen_right(right_table, right_file_prefix, tempdir);
+ ASSERT_OK(datagen_right());
+ std::string substrait_right_file_uri = "file://" +
datagen_right.data_file_path;
+
+ std::string substrait_json = R"({
+ "relations": [{
+ "rel": {
+ "join": {
+ "common": {
+ "emit": {
+ "outputMapping": [0, 1, 2, 3, 8, 9, 10, 11]
+ }
+ },
+ "left": {
+ "read": {
+ "base_schema": {
+ "names": ["A", "B", "C", "D"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_left_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ },
+ "right": {
+ "read": {
+ "base_schema": {
+ "names": ["X", "Y", "Z", "W"],
+ "struct": {
+ "types": [{
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }, {
+ "i32": {}
+ }]
+ }
+ },
+ "local_files": {
+ "items": [
+ {
+ "uri_file": ")" +
+ substrait_right_file_uri +
+ R"(",
+ "parquet": {}
+ }
+ ]
+ }
+ }
+ },
+ "expression": {
+ "scalarFunction": {
+ "functionReference": 0,
+ "arguments": [{
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }, {
+ "value": {
+ "selection": {
+ "directReference": {
+ "structField": {
+ "field": 0
+ }
+ },
+ "rootReference": {
+ }
+ }
+ }
+ }],
+ "output_type": {
+ "bool": {}
+ }
+ }
+ },
+ "type": "JOIN_TYPE_INNER"
+ }
+ }
+ }],
+ "extension_uris": [
+ {
+ "extension_uri_anchor": 0,
+ "uri": ")" + std::string(kSubstraitComparisonFunctionsUri) +
+ R"("
+ }
+ ],
+ "extensions": [
+ {"extension_function": {
+ "extension_uri_reference": 0,
+ "function_anchor": 0,
+ "name": "equal"
+ }}
+ ]
+ })";
+
+ ASSERT_OK_AND_ASSIGN(auto buf, internal::SubstraitFromJSON("Plan",
substrait_json));
+ auto output_schema = schema({
+ field("A", int32()),
+ field("B", int32()),
+ field("C", int32()),
+ field("D", int32()),
+ field("X", int32()),
+ field("Y", int32()),
+ field("Z", int32()),
+ field("W", int32()),
+ });
Review Comment:
I think these join tests are made complicated by having so many fields.
It's probably ok to have just 1-2 fields in each input.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]