This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 01dc5e6abaf [SPARK-42786][CONNECT] Typed Select
01dc5e6abaf is described below

commit 01dc5e6abafeb0bee2049d3e9da73fb89703d958
Author: Zhen Li <[email protected]>
AuthorDate: Tue Mar 21 23:34:08 2023 -0400

    [SPARK-42786][CONNECT] Typed Select
    
    ### What changes were proposed in this pull request?
    Implement typed select methods in the Dataset.
    
    ### Why are the changes needed?
    More APIs
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit and E2E tests
    
    Closes #40413 from zhenlineo/select-typed.
    
    Authored-by: Zhen Li <[email protected]>
    Signed-off-by: Herman van Hovell <[email protected]>
---
 .../main/scala/org/apache/spark/sql/Dataset.scala  |  79 ++++++++++++++++++--
 .../org/apache/spark/sql/ClientE2ETestSuite.scala  |  31 +++++++-
 .../apache/spark/sql/PlanGenerationTestSuite.scala |  35 +++++++++
 .../CheckConnectJvmClientCompatibility.scala       |   3 +-
 .../explain-results/select_typed_2-arg.explain     |   2 +
 .../explain-results/select_typed_3-arg.explain     |   2 +
 .../explain-results/select_typed_4-arg.explain     |   2 +
 .../explain-results/select_typed_5-arg.explain     |   2 +
 .../query-tests/queries/select_typed_2-arg.json    |  42 +++++++++++
 .../queries/select_typed_2-arg.proto.bin           | Bin 0 -> 101 bytes
 .../query-tests/queries/select_typed_3-arg.json    |  55 ++++++++++++++
 .../queries/select_typed_3-arg.proto.bin           | Bin 0 -> 128 bytes
 .../query-tests/queries/select_typed_4-arg.json    |  68 +++++++++++++++++
 .../queries/select_typed_4-arg.proto.bin           | Bin 0 -> 157 bytes
 .../query-tests/queries/select_typed_5-arg.json    |  81 +++++++++++++++++++++
 .../queries/select_typed_5-arg.proto.bin           | Bin 0 -> 182 bytes
 .../sql/catalyst/encoders/AgnosticEncoder.scala    |  14 ++++
 17 files changed, 407 insertions(+), 9 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
index fdc994b2d90..cc9f66a8ba0 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -25,7 +25,7 @@ import scala.util.control.NonFatal
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.connect.proto
 import org.apache.spark.sql.catalyst.encoders.AgnosticEncoder
-import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveLongEncoder, 
StringEncoder, UnboundRowEncoder}
+import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.{PrimitiveLongEncoder, 
ProductEncoder, StringEncoder, UnboundRowEncoder}
 import org.apache.spark.sql.catalyst.expressions.RowOrdering
 import org.apache.spark.sql.connect.client.SparkResult
 import org.apache.spark.sql.connect.common.DataTypeProtoConverter
@@ -1020,11 +1020,8 @@ class Dataset[T] private[sql] (
    * @since 3.4.0
    */
   @scala.annotation.varargs
-  def select(cols: Column*): DataFrame = sparkSession.newDataFrame { builder =>
-    builder.getProjectBuilder
-      .setInput(plan.getRoot)
-      .addAllExpressions(cols.map(_.expr).asJava)
-  }
+  def select(cols: Column*): DataFrame =
+    selectUntyped(UnboundRowEncoder, cols).asInstanceOf[DataFrame]
 
   /**
    * Selects a set of columns. This is a variant of `select` that can only 
select existing columns
@@ -1084,6 +1081,76 @@ class Dataset[T] private[sql] (
     }
   }
 
+  /**
+   * Internal helper function for building typed selects that return tuples. 
For simplicity and
+   * code reuse, we do this without the help of the type system and then use 
helper functions that
+   * cast appropriately for the user facing interface.
+   */
+  private def selectUntyped(columns: TypedColumn[_, _]*): Dataset[_] = {
+    val encoder = ProductEncoder.tuple(columns.map(_.encoder))
+    selectUntyped(encoder, columns)
+  }
+
+  /**
+   * Internal helper function for all select methods. The only difference 
between the select
+   * methods and typed select methods is the encoder used to build the return 
dataset.
+   */
+  private def selectUntyped(encoder: AgnosticEncoder[_], cols: Seq[Column]): 
Dataset[_] = {
+    sparkSession.newDataset(encoder) { builder =>
+      builder.getProjectBuilder
+        .setInput(plan.getRoot)
+        .addAllExpressions(cols.map(_.expr).asJava)
+    }
+  }
+
+  /**
+   * Returns a new Dataset by computing the given [[Column]] expressions for 
each element.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def select[U1, U2](c1: TypedColumn[T, U1], c2: TypedColumn[T, U2]): 
Dataset[(U1, U2)] =
+    selectUntyped(c1, c2).asInstanceOf[Dataset[(U1, U2)]]
+
+  /**
+   * Returns a new Dataset by computing the given [[Column]] expressions for 
each element.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def select[U1, U2, U3](
+      c1: TypedColumn[T, U1],
+      c2: TypedColumn[T, U2],
+      c3: TypedColumn[T, U3]): Dataset[(U1, U2, U3)] =
+    selectUntyped(c1, c2, c3).asInstanceOf[Dataset[(U1, U2, U3)]]
+
+  /**
+   * Returns a new Dataset by computing the given [[Column]] expressions for 
each element.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def select[U1, U2, U3, U4](
+      c1: TypedColumn[T, U1],
+      c2: TypedColumn[T, U2],
+      c3: TypedColumn[T, U3],
+      c4: TypedColumn[T, U4]): Dataset[(U1, U2, U3, U4)] =
+    selectUntyped(c1, c2, c3, c4).asInstanceOf[Dataset[(U1, U2, U3, U4)]]
+
+  /**
+   * Returns a new Dataset by computing the given [[Column]] expressions for 
each element.
+   *
+   * @group typedrel
+   * @since 3.4.0
+   */
+  def select[U1, U2, U3, U4, U5](
+      c1: TypedColumn[T, U1],
+      c2: TypedColumn[T, U2],
+      c3: TypedColumn[T, U3],
+      c4: TypedColumn[T, U4],
+      c5: TypedColumn[T, U5]): Dataset[(U1, U2, U3, U4, U5)] =
+    selectUntyped(c1, c2, c3, c4, c5).asInstanceOf[Dataset[(U1, U2, U3, U4, 
U5)]]
+
   /**
    * Filters rows using the given condition.
    * {{{
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
index 605b15123c6..0e5784326ad 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala
@@ -33,7 +33,7 @@ import 
org.apache.spark.sql.catalyst.encoders.AgnosticEncoders.StringEncoder
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, 
RemoteSparkSession}
-import org.apache.spark.sql.functions.{aggregate, array, broadcast, col, 
count, lit, rand, sequence, shuffle, struct, transform, udf}
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 
 class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper {
@@ -486,6 +486,19 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper {
     }
   }
 
+  private def validateMyTypeResult(result: Array[(MyType, MyType, MyType)]): 
Unit = {
+    result.zipWithIndex.foreach { case (row, i) =>
+      val t1 = row._1
+      val t2 = row._2
+      val t3 = row._3
+      assert(t1 === t2)
+      assert(t2 === t3)
+      assert(t1.id == i)
+      assert(t1.a == t1.id % 2)
+      assert(t1.b == t1.id / 10.0d)
+    }
+  }
+
   test("Dataset collect complex type") {
     val session = spark
     import session.implicits._
@@ -502,6 +515,12 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper {
     assert(numRows === 1000)
   }
 
+  test("Dataset typed select - multiple columns") {
+    val result = spark.range(1000).select(count("id"), sum("id")).first()
+    assert(result.getLong(0) === 1000)
+    assert(result.getLong(1) === 499500)
+  }
+
   test("Dataset typed select - complex column") {
     val session = spark
     import session.implicits._
@@ -511,6 +530,16 @@ class ClientE2ETestSuite extends RemoteSparkSession with 
SQLHelper {
     validateMyTypeResult(ds.collect())
   }
 
+  test("Dataset typed select - multiple complex columns") {
+    val session = spark
+    import session.implicits._
+    val s = struct(generateMyTypeColumns: _*).as[MyType]
+    val ds = session
+      .range(3)
+      .select(s, s, s)
+    validateMyTypeResult(ds.collect())
+  }
+
   test("lambda functions") {
     // This test is mostly to validate lambda variables are properly resolved.
     val result = spark
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
index 95d6fddc97c..cb62977d937 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -307,6 +307,41 @@ class PlanGenerationTestSuite
     simple.select(fn.struct(fn.col("id"), fn.col("a")).as(encoder))
   }
 
+  test("select typed 2-arg") {
+    val encoder = ScalaReflection.encoderFor[(Long, Int)]
+    val encoder2 = ScalaReflection.encoderFor[(Double, Double)]
+    val col = fn.struct(fn.col("id"), fn.col("a"))
+    val col2 = fn.struct(fn.col("a"), fn.col("b"))
+    simple.select(col.as(encoder), col2.as(encoder2))
+  }
+
+  test("select typed 3-arg") {
+    val encoder = ScalaReflection.encoderFor[(Long, Int)]
+    val encoder2 = ScalaReflection.encoderFor[(Double, Double)]
+    val col = fn.struct(fn.col("id"), fn.col("a"))
+    val col2 = fn.struct(fn.col("a"), fn.col("b"))
+    simple.select(col.as(encoder), col2.as(encoder2), col.as(encoder))
+  }
+
+  test("select typed 4-arg") {
+    val encoder = ScalaReflection.encoderFor[(Long, Int)]
+    val col = fn.struct(fn.col("id"), fn.col("a"))
+    simple.select(col.as(encoder), col.as(encoder), col.as(encoder), 
col.as(encoder))
+  }
+
+  test("select typed 5-arg") {
+    val encoder = ScalaReflection.encoderFor[(Long, Int)]
+    val encoder2 = ScalaReflection.encoderFor[(Double, Double)]
+    val col = fn.struct(fn.col("id"), fn.col("a"))
+    val col2 = fn.struct(fn.col("a"), fn.col("b"))
+    simple.select(
+      col.as(encoder),
+      col2.as(encoder2),
+      col.as(encoder),
+      col.as(encoder),
+      col2.as(encoder2))
+  }
+
   test("limit") {
     simple.limit(10)
   }
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index a2b4762f0a9..f62841c05cb 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -150,8 +150,7 @@ object CheckConnectJvmClientCompatibility {
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.encoder"),
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.sqlContext"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.joinWith"),
-      ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.select"),
-      
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.selectUntyped"),
+      
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.selectUntyped"), 
// protected
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.reduce"),
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.groupByKey"),
       ProblemFilters.exclude[Problem]("org.apache.spark.sql.Dataset.explode"), 
// deprecated
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_2-arg.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_2-arg.explain
new file mode 100644
index 00000000000..324519c6853
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_2-arg.explain
@@ -0,0 +1,2 @@
+Project [struct(id, id#0L, a, a#0) AS struct(id, a)#0, struct(a, a#0, b, b#0) 
AS struct(a, b)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_3-arg.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_3-arg.explain
new file mode 100644
index 00000000000..403c9346c17
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_3-arg.explain
@@ -0,0 +1,2 @@
+Project [struct(id, id#0L, a, a#0) AS struct(id, a)#0, struct(a, a#0, b, b#0) 
AS struct(a, b)#0, struct(id, id#0L, a, a#0) AS struct(id, a)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_4-arg.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_4-arg.explain
new file mode 100644
index 00000000000..714ed57fff1
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_4-arg.explain
@@ -0,0 +1,2 @@
+Project [struct(id, id#0L, a, a#0) AS struct(id, a)#0, struct(id, id#0L, a, 
a#0) AS struct(id, a)#0, struct(id, id#0L, a, a#0) AS struct(id, a)#0, 
struct(id, id#0L, a, a#0) AS struct(id, a)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_5-arg.explain
 
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_5-arg.explain
new file mode 100644
index 00000000000..b11439319a1
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/explain-results/select_typed_5-arg.explain
@@ -0,0 +1,2 @@
+Project [struct(id, id#0L, a, a#0) AS struct(id, a)#0, struct(a, a#0, b, b#0) 
AS struct(a, b)#0, struct(id, id#0L, a, a#0) AS struct(id, a)#0, struct(id, 
id#0L, a, a#0) AS struct(id, a)#0, struct(a, a#0, b, b#0) AS struct(a, b)#0]
++- LocalRelation <empty>, [id#0L, a#0, b#0]
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_2-arg.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_2-arg.json
new file mode 100644
index 00000000000..c9c6c752356
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_2-arg.json
@@ -0,0 +1,42 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "struct",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "struct",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "b"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_2-arg.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_2-arg.proto.bin
new file mode 100644
index 00000000000..37f3915cd8d
Binary files /dev/null and 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_2-arg.proto.bin
 differ
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_3-arg.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_3-arg.json
new file mode 100644
index 00000000000..23850dcb136
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_3-arg.json
@@ -0,0 +1,55 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "struct",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "struct",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "b"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "struct",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_3-arg.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_3-arg.proto.bin
new file mode 100644
index 00000000000..b3b56953a85
Binary files /dev/null and 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_3-arg.proto.bin
 differ
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_4-arg.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_4-arg.json
new file mode 100644
index 00000000000..2bbdb60794d
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_4-arg.json
@@ -0,0 +1,68 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "struct",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "struct",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "struct",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "struct",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_4-arg.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_4-arg.proto.bin
new file mode 100644
index 00000000000..bacccff22ae
Binary files /dev/null and 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_4-arg.proto.bin
 differ
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_5-arg.json
 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_5-arg.json
new file mode 100644
index 00000000000..4f57c0ef821
--- /dev/null
+++ 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_5-arg.json
@@ -0,0 +1,81 @@
+{
+  "common": {
+    "planId": "1"
+  },
+  "project": {
+    "input": {
+      "common": {
+        "planId": "0"
+      },
+      "localRelation": {
+        "schema": "struct\u003cid:bigint,a:int,b:double\u003e"
+      }
+    },
+    "expressions": [{
+      "unresolvedFunction": {
+        "functionName": "struct",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "struct",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "b"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "struct",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "struct",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "id"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }]
+      }
+    }, {
+      "unresolvedFunction": {
+        "functionName": "struct",
+        "arguments": [{
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "a"
+          }
+        }, {
+          "unresolvedAttribute": {
+            "unparsedIdentifier": "b"
+          }
+        }]
+      }
+    }]
+  }
+}
\ No newline at end of file
diff --git 
a/connector/connect/common/src/test/resources/query-tests/queries/select_typed_5-arg.proto.bin
 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_5-arg.proto.bin
new file mode 100644
index 00000000000..2c51e208888
Binary files /dev/null and 
b/connector/connect/common/src/test/resources/query-tests/queries/select_typed_5-arg.proto.bin
 differ
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
index 24c8bad5c2f..6599916ec7f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/AgnosticEncoder.scala
@@ -19,12 +19,14 @@ package org.apache.spark.sql.catalyst.encoders
 import java.{sql => jsql}
 import java.math.{BigDecimal => JBigDecimal, BigInteger => JBigInt}
 import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period}
+import java.util.concurrent.ConcurrentHashMap
 
 import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.sql.{Encoder, Row}
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.CalendarInterval
+import org.apache.spark.util.Utils
 
 /**
  * A non implementation specific encoder. This encoder containers all the 
information needed
@@ -107,6 +109,18 @@ object AgnosticEncoders {
     override def dataType: DataType = schema
   }
 
+  object ProductEncoder {
+    val cachedCls = new ConcurrentHashMap[Int, Class[_]]
+    def tuple(encoders: Seq[AgnosticEncoder[_]]): AgnosticEncoder[_] = {
+      val fields = encoders.zipWithIndex.map {
+        case (e, id) => EncoderField(s"_${id + 1}", e, e.nullable, 
Metadata.empty)
+      }
+      val cls = cachedCls.computeIfAbsent(encoders.size,
+        _ => 
Utils.getContextOrSparkClassLoader.loadClass(s"scala.Tuple${encoders.size}"))
+      ProductEncoder[Any](ClassTag(cls), fields)
+    }
+  }
+
   abstract class BaseRowEncoder extends AgnosticEncoder[Row] {
     override def isPrimitive: Boolean = false
     override def dataType: DataType = schema


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to