This is an automated email from the ASF dual-hosted git repository.
hvanhovell pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new fd97de41206 [SPARK-42555][CONNECT][FOLLOWUP] Add the new proto msg to
support the remaining jdbc API
fd97de41206 is described below
commit fd97de4120690a5df16b942b0d80770285701f50
Author: Jiaan Geng <[email protected]>
AuthorDate: Wed Mar 8 10:50:00 2023 -0400
[SPARK-42555][CONNECT][FOLLOWUP] Add the new proto msg to support the
remaining jdbc API
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/40252 supported some jdbc API that
reuse the proto msg `DataSource`. The `DataFrameReader` also have another kind
jdbc API that is unrelated to load data source.
### Why are the changes needed?
This PR adds the new proto msg `PartitionedJDBC` to support the remaining
jdbc API.
### Does this PR introduce _any_ user-facing change?
'No'.
New feature.
### How was this patch tested?
New test cases.
Closes #40277 from beliefer/SPARK-42555_followup.
Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit 39a55121888d2543a6056be65e0c74126a9d3bdf)
Signed-off-by: Herman van Hovell <[email protected]>
---
.../org/apache/spark/sql/DataFrameReader.scala | 42 +++++
.../apache/spark/sql/PlanGenerationTestSuite.scala | 21 +--
.../main/protobuf/spark/connect/relations.proto | 5 +
.../read_jdbc_with_predicates.explain | 1 +
.../queries/read_jdbc_with_predicates.json | 15 ++
.../queries/read_jdbc_with_predicates.proto.bin | Bin 0 -> 121 bytes
.../sql/connect/planner/SparkConnectPlanner.scala | 55 ++++--
.../sql/connect/ProtoToParsedPlanTestSuite.scala | 4 +
python/pyspark/sql/connect/proto/relations_pb2.py | 190 ++++++++++-----------
python/pyspark/sql/connect/proto/relations_pb2.pyi | 12 ++
10 files changed, 222 insertions(+), 123 deletions(-)
diff --git
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 43d6486f124..d5641fb303a 100644
---
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -250,6 +250,48 @@ class DataFrameReader private[sql] (sparkSession:
SparkSession) extends Logging
jdbc(url, table, connectionProperties)
}
+ /**
+ * Construct a `DataFrame` representing the database table accessible via
JDBC URL url named
+ * table using connection properties. The `predicates` parameter gives a
list expressions
+ * suitable for inclusion in WHERE clauses; each one defines one partition
of the `DataFrame`.
+ *
+ * Don't create too many partitions in parallel on a large cluster;
otherwise Spark might crash
+ * your external database systems.
+ *
+ * You can find the JDBC-specific option and parameter documentation for
reading tables via JDBC
+ * in <a
+ *
href="https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html#data-source-option">
+ * Data Source Option</a> in the version you use.
+ *
+ * @param table
+ * Name of the table in the external database.
+ * @param predicates
+ * Condition in the where clause for each partition.
+ * @param connectionProperties
+ * JDBC database connection arguments, a list of arbitrary string
tag/value. Normally at least
+ * a "user" and "password" property should be included. "fetchsize" can be
used to control the
+ * number of rows per fetch.
+ * @since 3.4.0
+ */
+ def jdbc(
+ url: String,
+ table: String,
+ predicates: Array[String],
+ connectionProperties: Properties): DataFrame = {
+ sparkSession.newDataFrame { builder =>
+ val dataSourceBuilder = builder.getReadBuilder.getDataSourceBuilder
+ format("jdbc")
+ dataSourceBuilder.setFormat(source)
+ predicates.foreach(predicate =>
dataSourceBuilder.addPredicates(predicate))
+ this.extraOptions ++= Seq("url" -> url, "dbtable" -> table)
+ val params = extraOptions ++ connectionProperties.asScala
+ params.foreach { case (k, v) =>
+ dataSourceBuilder.putOptions(k, v)
+ }
+ builder.build()
+ }
+ }
+
/**
* Loads a JSON file and returns the results as a `DataFrame`.
*
diff --git
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
old mode 100755
new mode 100644
index 027b7a30246..56c5111912a
---
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
+++
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/PlanGenerationTestSuite.scala
@@ -163,6 +163,8 @@ class PlanGenerationTestSuite
}
}
+ private val urlWithUserAndPass =
"jdbc:h2:mem:testdb0;user=testUser;password=testPass"
+
private val simpleSchema = new StructType()
.add("id", "long")
.add("a", "int")
@@ -236,21 +238,16 @@ class PlanGenerationTestSuite
}
test("read jdbc") {
- session.read.jdbc(
- "jdbc:h2:mem:testdb0;user=testUser;password=testPass",
- "TEST.TIMETYPES",
- new Properties())
+ session.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties())
}
test("read jdbc with partition") {
- session.read.jdbc(
- "jdbc:h2:mem:testdb0;user=testUser;password=testPass",
- "TEST.EMP",
- "THEID",
- 0,
- 4,
- 3,
- new Properties())
+ session.read.jdbc(urlWithUserAndPass, "TEST.EMP", "THEID", 0, 4, 3, new
Properties())
+ }
+
+ test("read jdbc with predicates") {
+ val parts = Array[String]("THEID < 2", "THEID >= 2")
+ session.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new
Properties())
}
test("read json") {
diff --git
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index 2230aada4fe..ab67ade9fb7 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -140,6 +140,11 @@ message Read {
// (Optional) A list of path for file-system backed data sources.
repeated string paths = 4;
+
+ // (Optional) Condition in the where clause for each partition.
+ //
+ // This is only supported by the JDBC data source.
+ repeated string predicates = 5;
}
}
diff --git
a/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc_with_predicates.explain
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc_with_predicates.explain
new file mode 100644
index 00000000000..d3eb0fc7d0f
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/explain-results/read_jdbc_with_predicates.explain
@@ -0,0 +1 @@
+Relation [NAME#0,THEID#0] JDBCRelation(TEST.PEOPLE) [numPartitions=2]
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_predicates.json
b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_predicates.json
new file mode 100644
index 00000000000..d8d4cfbdcab
--- /dev/null
+++
b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_predicates.json
@@ -0,0 +1,15 @@
+{
+ "common": {
+ "planId": "0"
+ },
+ "read": {
+ "dataSource": {
+ "format": "jdbc",
+ "options": {
+ "url": "jdbc:h2:mem:testdb0;user\u003dtestUser;password\u003dtestPass",
+ "dbtable": "TEST.PEOPLE"
+ },
+ "predicates": ["THEID \u003c 2", "THEID \u003e\u003d 2"]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_predicates.proto.bin
b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_predicates.proto.bin
new file mode 100644
index 00000000000..9b1d5812e47
Binary files /dev/null and
b/connector/connect/common/src/test/resources/query-tests/queries/read_jdbc_with_predicates.proto.bin
differ
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 3b9443f4e3c..b51dbfa6602 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -24,7 +24,7 @@ import com.google.common.collect.{Lists, Maps}
import com.google.protobuf.{Any => ProtoAny, ByteString}
import io.grpc.stub.StreamObserver
-import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
import org.apache.spark.api.python.{PythonEvalType, SimplePythonFunction}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{ExecutePlanResponse, SqlCommand}
@@ -48,6 +48,8 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.arrow.ArrowConverters
import org.apache.spark.sql.execution.command.CreateViewCommand
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions,
JDBCPartition, JDBCRelation}
import org.apache.spark.sql.execution.python.UserDefinedPythonFunction
import org.apache.spark.sql.internal.CatalogImpl
import org.apache.spark.sql.types._
@@ -688,25 +690,46 @@ class SparkConnectPlanner(val session: SparkSession) {
reader.format(rel.getDataSource.getFormat)
}
localMap.foreach { case (key, value) => reader.option(key, value) }
- if (rel.getDataSource.hasSchema &&
rel.getDataSource.getSchema.nonEmpty) {
-
- DataType.parseTypeWithFallback(
- rel.getDataSource.getSchema,
- StructType.fromDDL,
- fallbackParser = DataType.fromJson) match {
- case s: StructType => reader.schema(s)
- case other => throw InvalidPlanInput(s"Invalid schema $other")
+
+ if (rel.getDataSource.getFormat == "jdbc" &&
rel.getDataSource.getPredicatesCount > 0) {
+ if (!localMap.contains(JDBCOptions.JDBC_URL) ||
+ !localMap.contains(JDBCOptions.JDBC_TABLE_NAME)) {
+ throw InvalidPlanInput(s"Invalid jdbc params, please specify jdbc
url and table.")
+ }
+
+ val url = rel.getDataSource.getOptionsMap.get(JDBCOptions.JDBC_URL)
+ val table =
rel.getDataSource.getOptionsMap.get(JDBCOptions.JDBC_TABLE_NAME)
+ val options = new JDBCOptions(url, table, localMap)
+ val predicates = rel.getDataSource.getPredicatesList.asScala.toArray
+ val parts: Array[Partition] = predicates.zipWithIndex.map { case
(part, i) =>
+ JDBCPartition(part, i): Partition
+ }
+ val relation = JDBCRelation(parts, options)(session)
+ LogicalRelation(relation)
+ } else if (rel.getDataSource.getPredicatesCount == 0) {
+ if (rel.getDataSource.hasSchema &&
rel.getDataSource.getSchema.nonEmpty) {
+
+ DataType.parseTypeWithFallback(
+ rel.getDataSource.getSchema,
+ StructType.fromDDL,
+ fallbackParser = DataType.fromJson) match {
+ case s: StructType => reader.schema(s)
+ case other => throw InvalidPlanInput(s"Invalid schema $other")
+ }
+ }
+ if (rel.getDataSource.getPathsCount == 0) {
+ reader.load().queryExecution.analyzed
+ } else if (rel.getDataSource.getPathsCount == 1) {
+ reader.load(rel.getDataSource.getPaths(0)).queryExecution.analyzed
+ } else {
+ reader.load(rel.getDataSource.getPathsList.asScala.toSeq:
_*).queryExecution.analyzed
}
- }
- if (rel.getDataSource.getPathsCount == 0) {
- reader.load().queryExecution.analyzed
- } else if (rel.getDataSource.getPathsCount == 1) {
- reader.load(rel.getDataSource.getPaths(0)).queryExecution.analyzed
} else {
- reader.load(rel.getDataSource.getPathsList.asScala.toSeq:
_*).queryExecution.analyzed
+ throw InvalidPlanInput(
+ s"Predicates are not supported for ${rel.getDataSource.getFormat}
data sources.")
}
- case _ => throw InvalidPlanInput("Does not support " +
rel.getReadTypeCase.name())
+ case _ => throw InvalidPlanInput(s"Does not support
${rel.getReadTypeCase.name()}")
}
}
diff --git
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
index 142ae175090..e20a6159cc8 100644
---
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
+++
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
@@ -74,6 +74,10 @@ class ProtoToParsedPlanTestSuite extends SparkFunSuite with
SharedSparkSession {
conn = DriverManager.getConnection(url, properties)
conn.prepareStatement("create schema test").executeUpdate()
+ conn
+ .prepareStatement(
+ "create table test.people (name TEXT(32) NOT NULL, theid INTEGER NOT
NULL)")
+ .executeUpdate()
conn
.prepareStatement("create table test.timetypes (a TIME, b DATE, c
TIMESTAMP(7))")
.executeUpdate()
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py
b/python/pyspark/sql/connect/proto/relations_pb2.py
index f38cff1990d..e577749c3ed 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -36,7 +36,7 @@ from pyspark.sql.connect.proto import catalog_pb2 as
spark_dot_connect_dot_catal
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xfb\x12\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
+
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xfb\x12\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
)
@@ -669,101 +669,101 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_SQL_ARGSENTRY._serialized_start = 2778
_SQL_ARGSENTRY._serialized_end = 2833
_READ._serialized_start = 2836
- _READ._serialized_end = 3300
+ _READ._serialized_end = 3332
_READ_NAMEDTABLE._serialized_start = 2978
_READ_NAMEDTABLE._serialized_end = 3039
_READ_DATASOURCE._serialized_start = 3042
- _READ_DATASOURCE._serialized_end = 3287
- _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 3207
- _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 3265
- _PROJECT._serialized_start = 3302
- _PROJECT._serialized_end = 3419
- _FILTER._serialized_start = 3421
- _FILTER._serialized_end = 3533
- _JOIN._serialized_start = 3536
- _JOIN._serialized_end = 4007
- _JOIN_JOINTYPE._serialized_start = 3799
- _JOIN_JOINTYPE._serialized_end = 4007
- _SETOPERATION._serialized_start = 4010
- _SETOPERATION._serialized_end = 4489
- _SETOPERATION_SETOPTYPE._serialized_start = 4326
- _SETOPERATION_SETOPTYPE._serialized_end = 4440
- _LIMIT._serialized_start = 4491
- _LIMIT._serialized_end = 4567
- _OFFSET._serialized_start = 4569
- _OFFSET._serialized_end = 4648
- _TAIL._serialized_start = 4650
- _TAIL._serialized_end = 4725
- _AGGREGATE._serialized_start = 4728
- _AGGREGATE._serialized_end = 5310
- _AGGREGATE_PIVOT._serialized_start = 5067
- _AGGREGATE_PIVOT._serialized_end = 5178
- _AGGREGATE_GROUPTYPE._serialized_start = 5181
- _AGGREGATE_GROUPTYPE._serialized_end = 5310
- _SORT._serialized_start = 5313
- _SORT._serialized_end = 5473
- _DROP._serialized_start = 5476
- _DROP._serialized_end = 5617
- _DEDUPLICATE._serialized_start = 5620
- _DEDUPLICATE._serialized_end = 5791
- _LOCALRELATION._serialized_start = 5793
- _LOCALRELATION._serialized_end = 5882
- _SAMPLE._serialized_start = 5885
- _SAMPLE._serialized_end = 6158
- _RANGE._serialized_start = 6161
- _RANGE._serialized_end = 6306
- _SUBQUERYALIAS._serialized_start = 6308
- _SUBQUERYALIAS._serialized_end = 6422
- _REPARTITION._serialized_start = 6425
- _REPARTITION._serialized_end = 6567
- _SHOWSTRING._serialized_start = 6570
- _SHOWSTRING._serialized_end = 6712
- _STATSUMMARY._serialized_start = 6714
- _STATSUMMARY._serialized_end = 6806
- _STATDESCRIBE._serialized_start = 6808
- _STATDESCRIBE._serialized_end = 6889
- _STATCROSSTAB._serialized_start = 6891
- _STATCROSSTAB._serialized_end = 6992
- _STATCOV._serialized_start = 6994
- _STATCOV._serialized_end = 7090
- _STATCORR._serialized_start = 7093
- _STATCORR._serialized_end = 7230
- _STATAPPROXQUANTILE._serialized_start = 7233
- _STATAPPROXQUANTILE._serialized_end = 7397
- _STATFREQITEMS._serialized_start = 7399
- _STATFREQITEMS._serialized_end = 7524
- _STATSAMPLEBY._serialized_start = 7527
- _STATSAMPLEBY._serialized_end = 7836
- _STATSAMPLEBY_FRACTION._serialized_start = 7728
- _STATSAMPLEBY_FRACTION._serialized_end = 7827
- _NAFILL._serialized_start = 7839
- _NAFILL._serialized_end = 7973
- _NADROP._serialized_start = 7976
- _NADROP._serialized_end = 8110
- _NAREPLACE._serialized_start = 8113
- _NAREPLACE._serialized_end = 8409
- _NAREPLACE_REPLACEMENT._serialized_start = 8268
- _NAREPLACE_REPLACEMENT._serialized_end = 8409
- _TODF._serialized_start = 8411
- _TODF._serialized_end = 8499
- _WITHCOLUMNSRENAMED._serialized_start = 8502
- _WITHCOLUMNSRENAMED._serialized_end = 8741
- _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 8674
- _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 8741
- _WITHCOLUMNS._serialized_start = 8743
- _WITHCOLUMNS._serialized_end = 8862
- _HINT._serialized_start = 8865
- _HINT._serialized_end = 8997
- _UNPIVOT._serialized_start = 9000
- _UNPIVOT._serialized_end = 9327
- _UNPIVOT_VALUES._serialized_start = 9257
- _UNPIVOT_VALUES._serialized_end = 9316
- _TOSCHEMA._serialized_start = 9329
- _TOSCHEMA._serialized_end = 9435
- _REPARTITIONBYEXPRESSION._serialized_start = 9438
- _REPARTITIONBYEXPRESSION._serialized_end = 9641
- _FRAMEMAP._serialized_start = 9643
- _FRAMEMAP._serialized_end = 9768
- _COLLECTMETRICS._serialized_start = 9771
- _COLLECTMETRICS._serialized_end = 9907
+ _READ_DATASOURCE._serialized_end = 3319
+ _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 3239
+ _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 3297
+ _PROJECT._serialized_start = 3334
+ _PROJECT._serialized_end = 3451
+ _FILTER._serialized_start = 3453
+ _FILTER._serialized_end = 3565
+ _JOIN._serialized_start = 3568
+ _JOIN._serialized_end = 4039
+ _JOIN_JOINTYPE._serialized_start = 3831
+ _JOIN_JOINTYPE._serialized_end = 4039
+ _SETOPERATION._serialized_start = 4042
+ _SETOPERATION._serialized_end = 4521
+ _SETOPERATION_SETOPTYPE._serialized_start = 4358
+ _SETOPERATION_SETOPTYPE._serialized_end = 4472
+ _LIMIT._serialized_start = 4523
+ _LIMIT._serialized_end = 4599
+ _OFFSET._serialized_start = 4601
+ _OFFSET._serialized_end = 4680
+ _TAIL._serialized_start = 4682
+ _TAIL._serialized_end = 4757
+ _AGGREGATE._serialized_start = 4760
+ _AGGREGATE._serialized_end = 5342
+ _AGGREGATE_PIVOT._serialized_start = 5099
+ _AGGREGATE_PIVOT._serialized_end = 5210
+ _AGGREGATE_GROUPTYPE._serialized_start = 5213
+ _AGGREGATE_GROUPTYPE._serialized_end = 5342
+ _SORT._serialized_start = 5345
+ _SORT._serialized_end = 5505
+ _DROP._serialized_start = 5508
+ _DROP._serialized_end = 5649
+ _DEDUPLICATE._serialized_start = 5652
+ _DEDUPLICATE._serialized_end = 5823
+ _LOCALRELATION._serialized_start = 5825
+ _LOCALRELATION._serialized_end = 5914
+ _SAMPLE._serialized_start = 5917
+ _SAMPLE._serialized_end = 6190
+ _RANGE._serialized_start = 6193
+ _RANGE._serialized_end = 6338
+ _SUBQUERYALIAS._serialized_start = 6340
+ _SUBQUERYALIAS._serialized_end = 6454
+ _REPARTITION._serialized_start = 6457
+ _REPARTITION._serialized_end = 6599
+ _SHOWSTRING._serialized_start = 6602
+ _SHOWSTRING._serialized_end = 6744
+ _STATSUMMARY._serialized_start = 6746
+ _STATSUMMARY._serialized_end = 6838
+ _STATDESCRIBE._serialized_start = 6840
+ _STATDESCRIBE._serialized_end = 6921
+ _STATCROSSTAB._serialized_start = 6923
+ _STATCROSSTAB._serialized_end = 7024
+ _STATCOV._serialized_start = 7026
+ _STATCOV._serialized_end = 7122
+ _STATCORR._serialized_start = 7125
+ _STATCORR._serialized_end = 7262
+ _STATAPPROXQUANTILE._serialized_start = 7265
+ _STATAPPROXQUANTILE._serialized_end = 7429
+ _STATFREQITEMS._serialized_start = 7431
+ _STATFREQITEMS._serialized_end = 7556
+ _STATSAMPLEBY._serialized_start = 7559
+ _STATSAMPLEBY._serialized_end = 7868
+ _STATSAMPLEBY_FRACTION._serialized_start = 7760
+ _STATSAMPLEBY_FRACTION._serialized_end = 7859
+ _NAFILL._serialized_start = 7871
+ _NAFILL._serialized_end = 8005
+ _NADROP._serialized_start = 8008
+ _NADROP._serialized_end = 8142
+ _NAREPLACE._serialized_start = 8145
+ _NAREPLACE._serialized_end = 8441
+ _NAREPLACE_REPLACEMENT._serialized_start = 8300
+ _NAREPLACE_REPLACEMENT._serialized_end = 8441
+ _TODF._serialized_start = 8443
+ _TODF._serialized_end = 8531
+ _WITHCOLUMNSRENAMED._serialized_start = 8534
+ _WITHCOLUMNSRENAMED._serialized_end = 8773
+ _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 8706
+ _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 8773
+ _WITHCOLUMNS._serialized_start = 8775
+ _WITHCOLUMNS._serialized_end = 8894
+ _HINT._serialized_start = 8897
+ _HINT._serialized_end = 9029
+ _UNPIVOT._serialized_start = 9032
+ _UNPIVOT._serialized_end = 9359
+ _UNPIVOT_VALUES._serialized_start = 9289
+ _UNPIVOT_VALUES._serialized_end = 9348
+ _TOSCHEMA._serialized_start = 9361
+ _TOSCHEMA._serialized_end = 9467
+ _REPARTITIONBYEXPRESSION._serialized_start = 9470
+ _REPARTITIONBYEXPRESSION._serialized_end = 9673
+ _FRAMEMAP._serialized_start = 9675
+ _FRAMEMAP._serialized_end = 9800
+ _COLLECTMETRICS._serialized_start = 9803
+ _COLLECTMETRICS._serialized_end = 9939
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi
b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index e4d1653f0ba..d434451082e 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -610,6 +610,7 @@ class Read(google.protobuf.message.Message):
SCHEMA_FIELD_NUMBER: builtins.int
OPTIONS_FIELD_NUMBER: builtins.int
PATHS_FIELD_NUMBER: builtins.int
+ PREDICATES_FIELD_NUMBER: builtins.int
format: builtins.str
"""(Optional) Supported formats include: parquet, orc, text, json,
parquet, csv, avro.
@@ -633,6 +634,14 @@ class Read(google.protobuf.message.Message):
self,
) ->
google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
"""(Optional) A list of path for file-system backed data
sources."""
+ @property
+ def predicates(
+ self,
+ ) ->
google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
+ """(Optional) Condition in the where clause for each partition.
+
+ This is only supported by the JDBC data source.
+ """
def __init__(
self,
*,
@@ -640,6 +649,7 @@ class Read(google.protobuf.message.Message):
schema: builtins.str | None = ...,
options: collections.abc.Mapping[builtins.str, builtins.str] |
None = ...,
paths: collections.abc.Iterable[builtins.str] | None = ...,
+ predicates: collections.abc.Iterable[builtins.str] | None = ...,
) -> None: ...
def HasField(
self,
@@ -667,6 +677,8 @@ class Read(google.protobuf.message.Message):
b"options",
"paths",
b"paths",
+ "predicates",
+ b"predicates",
"schema",
b"schema",
],
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]