This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 400d4ded64e [SPARK-43662][PS][CONNECT] Support merge_asof in Spark
Connect
400d4ded64e is described below
commit 400d4ded64efdf62b75f2bcdc6025d474d6395ee
Author: Takuya UESHIN <[email protected]>
AuthorDate: Wed Sep 27 15:09:47 2023 +0800
[SPARK-43662][PS][CONNECT] Support merge_asof in Spark Connect
### What changes were proposed in this pull request?
Supports `merge_asof` in Spark Connect.
### Why are the changes needed?
`merge_asof` is missing in Spark Connect.
Ref:
https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.merge_asof.html
### Does this PR introduce _any_ user-facing change?
Yes, `merge_asof` is available in Spark Connect.
### How was this patch tested?
The parity tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #43137 from ueshin/issues/SPARK-43662/merge_asof.
Authored-by: Takuya UESHIN <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
---
.../main/protobuf/spark/connect/relations.proto | 42 ++++
.../sql/connect/planner/SparkConnectPlanner.scala | 37 +++
.../pandas/tests/connect/test_parity_reshape.py | 4 +-
python/pyspark/sql/connect/dataframe.py | 41 +++
python/pyspark/sql/connect/plan.py | 94 +++++++
python/pyspark/sql/connect/proto/relations_pb2.py | 278 +++++++++++----------
python/pyspark/sql/connect/proto/relations_pb2.pyi | 123 +++++++++
python/pyspark/sql/dataframe.py | 3 +
8 files changed, 481 insertions(+), 141 deletions(-)
diff --git
a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
index 0cf08431d46..deb33978386 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/relations.proto
@@ -72,6 +72,7 @@ message Relation {
CachedLocalRelation cached_local_relation = 36;
CachedRemoteRelation cached_remote_relation = 37;
CommonInlineUserDefinedTableFunction
common_inline_user_defined_table_function = 38;
+ AsOfJoin as_of_join = 39;
// NA functions
NAFill fill_na = 90;
@@ -1009,3 +1010,44 @@ message Parse {
PARSE_FORMAT_JSON = 2;
}
}
+
+// Relation of type [[AsOfJoin]].
+//
+// `left` and `right` must be present.
+message AsOfJoin {
+ // (Required) Left input relation for a Join.
+ Relation left = 1;
+
+ // (Required) Right input relation for a Join.
+ Relation right = 2;
+
+ // (Required) Field to join on in left DataFrame
+ Expression left_as_of = 3;
+
+ // (Required) Field to join on in right DataFrame
+ Expression right_as_of = 4;
+
+ // (Optional) The join condition. Could be unset when `using_columns` is
utilized.
+ //
+ // This field does not co-exist with using_columns.
+ Expression join_expr = 5;
+
+ // Optional. using_columns provides a list of columns that should present on
both sides of
+ // the join inputs that this Join will join on. For example A JOIN B USING
col_name is
+ // equivalent to A JOIN B on A.col_name = B.col_name.
+ //
+ // This field does not co-exist with join_condition.
+ repeated string using_columns = 6;
+
+ // (Required) The join type.
+ string join_type = 7;
+
+ // (Optional) The asof tolerance within this range.
+ Expression tolerance = 8;
+
+ // (Required) Whether allow matching with the same value or not.
+ bool allow_exact_matches = 9;
+
+ // (Required) Whether to search for prior, subsequent, or closest matches.
+ string direction = 10;
+}
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index dda7a713fa0..e6b5d89e1ab 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -110,6 +110,7 @@ class SparkConnectPlanner(val sessionHolder: SessionHolder)
extends Logging {
case proto.Relation.RelTypeCase.OFFSET => transformOffset(rel.getOffset)
case proto.Relation.RelTypeCase.TAIL => transformTail(rel.getTail)
case proto.Relation.RelTypeCase.JOIN =>
transformJoinOrJoinWith(rel.getJoin)
+ case proto.Relation.RelTypeCase.AS_OF_JOIN =>
transformAsOfJoin(rel.getAsOfJoin)
case proto.Relation.RelTypeCase.DEDUPLICATE =>
transformDeduplicate(rel.getDeduplicate)
case proto.Relation.RelTypeCase.SET_OP =>
transformSetOperation(rel.getSetOp)
case proto.Relation.RelTypeCase.SORT => transformSort(rel.getSort)
@@ -2275,6 +2276,42 @@ class SparkConnectPlanner(val sessionHolder:
SessionHolder) extends Logging {
}
}
+ private def transformAsOfJoin(rel: proto.AsOfJoin): LogicalPlan = {
+ val left = Dataset.ofRows(session, transformRelation(rel.getLeft))
+ val right = Dataset.ofRows(session, transformRelation(rel.getRight))
+ val leftAsOf = Column(transformExpression(rel.getLeftAsOf))
+ val rightAsOf = Column(transformExpression(rel.getRightAsOf))
+ val joinType = rel.getJoinType
+ val tolerance = if (rel.hasTolerance)
Column(transformExpression(rel.getTolerance)) else null
+ val allowExactMatches = rel.getAllowExactMatches
+ val direction = rel.getDirection
+
+ val joined = if (rel.getUsingColumnsCount > 0) {
+ val usingColumns = rel.getUsingColumnsList.asScala.toSeq
+ left.joinAsOf(
+ other = right,
+ leftAsOf = leftAsOf,
+ rightAsOf = rightAsOf,
+ usingColumns = usingColumns,
+ joinType = joinType,
+ tolerance = tolerance,
+ allowExactMatches = allowExactMatches,
+ direction = direction)
+ } else {
+ val joinExprs = if (rel.hasJoinExpr)
Column(transformExpression(rel.getJoinExpr)) else null
+ left.joinAsOf(
+ other = right,
+ leftAsOf = leftAsOf,
+ rightAsOf = rightAsOf,
+ joinExprs = joinExprs,
+ joinType = joinType,
+ tolerance = tolerance,
+ allowExactMatches = allowExactMatches,
+ direction = direction)
+ }
+ joined.logicalPlan
+ }
+
private def transformSort(sort: proto.Sort): LogicalPlan = {
assert(sort.getOrderCount > 0, "'order' must be present and contain
elements.")
logical.Sort(
diff --git a/python/pyspark/pandas/tests/connect/test_parity_reshape.py
b/python/pyspark/pandas/tests/connect/test_parity_reshape.py
index 0773978ba4b..356baaff5ba 100644
--- a/python/pyspark/pandas/tests/connect/test_parity_reshape.py
+++ b/python/pyspark/pandas/tests/connect/test_parity_reshape.py
@@ -22,9 +22,7 @@ from pyspark.testing.pandasutils import PandasOnSparkTestUtils
class ReshapeParityTests(ReshapeTestsMixin, PandasOnSparkTestUtils,
ReusedConnectTestCase):
- @unittest.skip("TODO(SPARK-43662): Enable
ReshapeParityTests.test_merge_asof.")
- def test_merge_asof(self):
- super().test_merge_asof()
+ pass
if __name__ == "__main__":
diff --git a/python/pyspark/sql/connect/dataframe.py
b/python/pyspark/sql/connect/dataframe.py
index 9cc1ddead33..5e2623336a2 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -594,6 +594,47 @@ class DataFrame:
join.__doc__ = PySparkDataFrame.join.__doc__
+ def _joinAsOf(
+ self,
+ other: "DataFrame",
+ leftAsOfColumn: Union[str, Column],
+ rightAsOfColumn: Union[str, Column],
+ on: Optional[Union[str, List[str], Column, List[Column]]] = None,
+ how: Optional[str] = None,
+ *,
+ tolerance: Optional[Column] = None,
+ allowExactMatches: bool = True,
+ direction: str = "backward",
+ ) -> "DataFrame":
+ if self._plan is None:
+ raise Exception("Cannot join when self._plan is empty.")
+ if other._plan is None:
+ raise Exception("Cannot join when other._plan is empty.")
+
+ if how is None:
+ how = "inner"
+ assert isinstance(how, str), "how should be a string"
+
+ if tolerance is not None:
+ assert isinstance(tolerance, Column), "tolerance should be Column"
+
+ return DataFrame.withPlan(
+ plan.AsOfJoin(
+ left=self._plan,
+ right=other._plan,
+ left_as_of=leftAsOfColumn,
+ right_as_of=rightAsOfColumn,
+ on=on,
+ how=how,
+ tolerance=tolerance,
+ allow_exact_matches=allowExactMatches,
+ direction=direction,
+ ),
+ session=self._session,
+ )
+
+ _joinAsOf.__doc__ = PySparkDataFrame._joinAsOf.__doc__
+
def limit(self, n: int) -> "DataFrame":
return DataFrame.withPlan(plan.Limit(child=self._plan, limit=n),
session=self._session)
diff --git a/python/pyspark/sql/connect/plan.py
b/python/pyspark/sql/connect/plan.py
index 6758b3673f3..10565b9965a 100644
--- a/python/pyspark/sql/connect/plan.py
+++ b/python/pyspark/sql/connect/plan.py
@@ -900,6 +900,100 @@ class Join(LogicalPlan):
"""
+class AsOfJoin(LogicalPlan):
+ def __init__(
+ self,
+ left: LogicalPlan,
+ right: LogicalPlan,
+ left_as_of: "ColumnOrName",
+ right_as_of: "ColumnOrName",
+ on: Optional[Union[str, List[str], Column, List[Column]]],
+ how: str,
+ tolerance: Optional[Column],
+ allow_exact_matches: bool,
+ direction: str,
+ ) -> None:
+ super().__init__(left)
+ self.left = left
+ self.right = right
+ self.left_as_of = left_as_of
+ self.right_as_of = right_as_of
+ self.on = on
+ self.how = how
+ self.tolerance = tolerance
+ self.allow_exact_matches = allow_exact_matches
+ self.direction = direction
+
+ def plan(self, session: "SparkConnectClient") -> proto.Relation:
+ plan = self._create_proto_relation()
+ plan.as_of_join.left.CopyFrom(self.left.plan(session))
+ plan.as_of_join.right.CopyFrom(self.right.plan(session))
+
+ if isinstance(self.left_as_of, Column):
+
plan.as_of_join.left_as_of.CopyFrom(self.left_as_of.to_plan(session))
+ else:
+ plan.as_of_join.left_as_of.CopyFrom(
+ ColumnReference(self.left_as_of,
self.left._plan_id).to_plan(session)
+ )
+
+ if isinstance(self.right_as_of, Column):
+
plan.as_of_join.right_as_of.CopyFrom(self.right_as_of.to_plan(session))
+ else:
+ plan.as_of_join.right_as_of.CopyFrom(
+ ColumnReference(self.right_as_of,
self.right._plan_id).to_plan(session)
+ )
+
+ if self.on is not None:
+ if not isinstance(self.on, list):
+ if isinstance(self.on, str):
+ plan.as_of_join.using_columns.append(self.on)
+ else:
+
plan.as_of_join.join_expr.CopyFrom(self.on.to_plan(session))
+ elif len(self.on) > 0:
+ if isinstance(self.on[0], str):
+ plan.as_of_join.using_columns.extend(cast(List[str],
self.on))
+ else:
+ merge_column = functools.reduce(lambda c1, c2: c1 & c2,
self.on)
+ plan.as_of_join.join_expr.CopyFrom(cast(Column,
merge_column).to_plan(session))
+
+ plan.as_of_join.join_type = self.how
+
+ if self.tolerance is not None:
+ plan.as_of_join.tolerance.CopyFrom(self.tolerance.to_plan(session))
+
+ plan.as_of_join.allow_exact_matches = self.allow_exact_matches
+ plan.as_of_join.direction = self.direction
+
+ return plan
+
+ def print(self, indent: int = 0) -> str:
+ assert self.left is not None
+ assert self.right is not None
+
+ i = " " * indent
+ o = " " * (indent + LogicalPlan.INDENT)
+ n = indent + LogicalPlan.INDENT * 2
+ return (
+ f"{i}<AsOfJoin left_as_of={self.left_as_of},
right_as_of={self.right_as_of}, "
+ f"on={self.on} how={self.how}>\n{o}"
+ f"left=\n{self.left.print(n)}\n{o}right=\n{self.right.print(n)}"
+ )
+
+ def _repr_html_(self) -> str:
+ assert self.left is not None
+ assert self.right is not None
+
+ return f"""
+ <ul>
+ <li>
+ <b>AsOfJoin</b><br />
+ Left: {self.left._repr_html_()}
+ Right: {self.right._repr_html_()}
+ </li>
+ </uL>
+ """
+
+
class SetOperation(LogicalPlan):
def __init__(
self,
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.py
b/python/pyspark/sql/connect/proto/relations_pb2.py
index ebb2682c619..fc70cdea402 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.py
+++ b/python/pyspark/sql/connect/proto/relations_pb2.py
@@ -35,7 +35,7 @@ from pyspark.sql.connect.proto import catalog_pb2 as
spark_dot_connect_dot_catal
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
-
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\xe1\x18\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
+
b'\n\x1dspark/connect/relations.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1fspark/connect/expressions.proto\x1a\x19spark/connect/types.proto\x1a\x1bspark/connect/catalog.proto"\x9a\x19\n\x08Relation\x12\x35\n\x06\x63ommon\x18\x01
\x01(\x0b\x32\x1d.spark.connect.RelationCommonR\x06\x63ommon\x12)\n\x04read\x18\x02
\x01(\x0b\x32\x13.spark.connect.ReadH\x00R\x04read\x12\x32\n\x07project\x18\x03
\x01(\x0b\x32\x16.spark.connect.ProjectH\x00R\x07project\x12/\n\x06\x66il [...]
)
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
@@ -62,141 +62,143 @@ if _descriptor._USE_C_DESCRIPTORS == False:
_PARSE_OPTIONSENTRY._options = None
_PARSE_OPTIONSENTRY._serialized_options = b"8\001"
_RELATION._serialized_start = 165
- _RELATION._serialized_end = 3334
- _UNKNOWN._serialized_start = 3336
- _UNKNOWN._serialized_end = 3345
- _RELATIONCOMMON._serialized_start = 3347
- _RELATIONCOMMON._serialized_end = 3438
- _SQL._serialized_start = 3441
- _SQL._serialized_end = 3919
- _SQL_ARGSENTRY._serialized_start = 3735
- _SQL_ARGSENTRY._serialized_end = 3825
- _SQL_NAMEDARGUMENTSENTRY._serialized_start = 3827
- _SQL_NAMEDARGUMENTSENTRY._serialized_end = 3919
- _READ._serialized_start = 3922
- _READ._serialized_end = 4585
- _READ_NAMEDTABLE._serialized_start = 4100
- _READ_NAMEDTABLE._serialized_end = 4292
- _READ_NAMEDTABLE_OPTIONSENTRY._serialized_start = 4234
- _READ_NAMEDTABLE_OPTIONSENTRY._serialized_end = 4292
- _READ_DATASOURCE._serialized_start = 4295
- _READ_DATASOURCE._serialized_end = 4572
- _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 4234
- _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 4292
- _PROJECT._serialized_start = 4587
- _PROJECT._serialized_end = 4704
- _FILTER._serialized_start = 4706
- _FILTER._serialized_end = 4818
- _JOIN._serialized_start = 4821
- _JOIN._serialized_end = 5482
- _JOIN_JOINDATATYPE._serialized_start = 5160
- _JOIN_JOINDATATYPE._serialized_end = 5252
- _JOIN_JOINTYPE._serialized_start = 5255
- _JOIN_JOINTYPE._serialized_end = 5463
- _SETOPERATION._serialized_start = 5485
- _SETOPERATION._serialized_end = 5964
- _SETOPERATION_SETOPTYPE._serialized_start = 5801
- _SETOPERATION_SETOPTYPE._serialized_end = 5915
- _LIMIT._serialized_start = 5966
- _LIMIT._serialized_end = 6042
- _OFFSET._serialized_start = 6044
- _OFFSET._serialized_end = 6123
- _TAIL._serialized_start = 6125
- _TAIL._serialized_end = 6200
- _AGGREGATE._serialized_start = 6203
- _AGGREGATE._serialized_end = 6785
- _AGGREGATE_PIVOT._serialized_start = 6542
- _AGGREGATE_PIVOT._serialized_end = 6653
- _AGGREGATE_GROUPTYPE._serialized_start = 6656
- _AGGREGATE_GROUPTYPE._serialized_end = 6785
- _SORT._serialized_start = 6788
- _SORT._serialized_end = 6948
- _DROP._serialized_start = 6951
- _DROP._serialized_end = 7092
- _DEDUPLICATE._serialized_start = 7095
- _DEDUPLICATE._serialized_end = 7335
- _LOCALRELATION._serialized_start = 7337
- _LOCALRELATION._serialized_end = 7426
- _CACHEDLOCALRELATION._serialized_start = 7428
- _CACHEDLOCALRELATION._serialized_end = 7500
- _CACHEDREMOTERELATION._serialized_start = 7502
- _CACHEDREMOTERELATION._serialized_end = 7557
- _SAMPLE._serialized_start = 7560
- _SAMPLE._serialized_end = 7833
- _RANGE._serialized_start = 7836
- _RANGE._serialized_end = 7981
- _SUBQUERYALIAS._serialized_start = 7983
- _SUBQUERYALIAS._serialized_end = 8097
- _REPARTITION._serialized_start = 8100
- _REPARTITION._serialized_end = 8242
- _SHOWSTRING._serialized_start = 8245
- _SHOWSTRING._serialized_end = 8387
- _HTMLSTRING._serialized_start = 8389
- _HTMLSTRING._serialized_end = 8503
- _STATSUMMARY._serialized_start = 8505
- _STATSUMMARY._serialized_end = 8597
- _STATDESCRIBE._serialized_start = 8599
- _STATDESCRIBE._serialized_end = 8680
- _STATCROSSTAB._serialized_start = 8682
- _STATCROSSTAB._serialized_end = 8783
- _STATCOV._serialized_start = 8785
- _STATCOV._serialized_end = 8881
- _STATCORR._serialized_start = 8884
- _STATCORR._serialized_end = 9021
- _STATAPPROXQUANTILE._serialized_start = 9024
- _STATAPPROXQUANTILE._serialized_end = 9188
- _STATFREQITEMS._serialized_start = 9190
- _STATFREQITEMS._serialized_end = 9315
- _STATSAMPLEBY._serialized_start = 9318
- _STATSAMPLEBY._serialized_end = 9627
- _STATSAMPLEBY_FRACTION._serialized_start = 9519
- _STATSAMPLEBY_FRACTION._serialized_end = 9618
- _NAFILL._serialized_start = 9630
- _NAFILL._serialized_end = 9764
- _NADROP._serialized_start = 9767
- _NADROP._serialized_end = 9901
- _NAREPLACE._serialized_start = 9904
- _NAREPLACE._serialized_end = 10200
- _NAREPLACE_REPLACEMENT._serialized_start = 10059
- _NAREPLACE_REPLACEMENT._serialized_end = 10200
- _TODF._serialized_start = 10202
- _TODF._serialized_end = 10290
- _WITHCOLUMNSRENAMED._serialized_start = 10293
- _WITHCOLUMNSRENAMED._serialized_end = 10532
- _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 10465
- _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 10532
- _WITHCOLUMNS._serialized_start = 10534
- _WITHCOLUMNS._serialized_end = 10653
- _WITHWATERMARK._serialized_start = 10656
- _WITHWATERMARK._serialized_end = 10790
- _HINT._serialized_start = 10793
- _HINT._serialized_end = 10925
- _UNPIVOT._serialized_start = 10928
- _UNPIVOT._serialized_end = 11255
- _UNPIVOT_VALUES._serialized_start = 11185
- _UNPIVOT_VALUES._serialized_end = 11244
- _TOSCHEMA._serialized_start = 11257
- _TOSCHEMA._serialized_end = 11363
- _REPARTITIONBYEXPRESSION._serialized_start = 11366
- _REPARTITIONBYEXPRESSION._serialized_end = 11569
- _MAPPARTITIONS._serialized_start = 11572
- _MAPPARTITIONS._serialized_end = 11753
- _GROUPMAP._serialized_start = 11756
- _GROUPMAP._serialized_end = 12391
- _COGROUPMAP._serialized_start = 12394
- _COGROUPMAP._serialized_end = 12920
- _APPLYINPANDASWITHSTATE._serialized_start = 12923
- _APPLYINPANDASWITHSTATE._serialized_end = 13280
- _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_start = 13283
- _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_end = 13527
- _PYTHONUDTF._serialized_start = 13530
- _PYTHONUDTF._serialized_end = 13707
- _COLLECTMETRICS._serialized_start = 13710
- _COLLECTMETRICS._serialized_end = 13846
- _PARSE._serialized_start = 13849
- _PARSE._serialized_end = 14237
- _PARSE_OPTIONSENTRY._serialized_start = 4234
- _PARSE_OPTIONSENTRY._serialized_end = 4292
- _PARSE_PARSEFORMAT._serialized_start = 14138
- _PARSE_PARSEFORMAT._serialized_end = 14226
+ _RELATION._serialized_end = 3391
+ _UNKNOWN._serialized_start = 3393
+ _UNKNOWN._serialized_end = 3402
+ _RELATIONCOMMON._serialized_start = 3404
+ _RELATIONCOMMON._serialized_end = 3495
+ _SQL._serialized_start = 3498
+ _SQL._serialized_end = 3976
+ _SQL_ARGSENTRY._serialized_start = 3792
+ _SQL_ARGSENTRY._serialized_end = 3882
+ _SQL_NAMEDARGUMENTSENTRY._serialized_start = 3884
+ _SQL_NAMEDARGUMENTSENTRY._serialized_end = 3976
+ _READ._serialized_start = 3979
+ _READ._serialized_end = 4642
+ _READ_NAMEDTABLE._serialized_start = 4157
+ _READ_NAMEDTABLE._serialized_end = 4349
+ _READ_NAMEDTABLE_OPTIONSENTRY._serialized_start = 4291
+ _READ_NAMEDTABLE_OPTIONSENTRY._serialized_end = 4349
+ _READ_DATASOURCE._serialized_start = 4352
+ _READ_DATASOURCE._serialized_end = 4629
+ _READ_DATASOURCE_OPTIONSENTRY._serialized_start = 4291
+ _READ_DATASOURCE_OPTIONSENTRY._serialized_end = 4349
+ _PROJECT._serialized_start = 4644
+ _PROJECT._serialized_end = 4761
+ _FILTER._serialized_start = 4763
+ _FILTER._serialized_end = 4875
+ _JOIN._serialized_start = 4878
+ _JOIN._serialized_end = 5539
+ _JOIN_JOINDATATYPE._serialized_start = 5217
+ _JOIN_JOINDATATYPE._serialized_end = 5309
+ _JOIN_JOINTYPE._serialized_start = 5312
+ _JOIN_JOINTYPE._serialized_end = 5520
+ _SETOPERATION._serialized_start = 5542
+ _SETOPERATION._serialized_end = 6021
+ _SETOPERATION_SETOPTYPE._serialized_start = 5858
+ _SETOPERATION_SETOPTYPE._serialized_end = 5972
+ _LIMIT._serialized_start = 6023
+ _LIMIT._serialized_end = 6099
+ _OFFSET._serialized_start = 6101
+ _OFFSET._serialized_end = 6180
+ _TAIL._serialized_start = 6182
+ _TAIL._serialized_end = 6257
+ _AGGREGATE._serialized_start = 6260
+ _AGGREGATE._serialized_end = 6842
+ _AGGREGATE_PIVOT._serialized_start = 6599
+ _AGGREGATE_PIVOT._serialized_end = 6710
+ _AGGREGATE_GROUPTYPE._serialized_start = 6713
+ _AGGREGATE_GROUPTYPE._serialized_end = 6842
+ _SORT._serialized_start = 6845
+ _SORT._serialized_end = 7005
+ _DROP._serialized_start = 7008
+ _DROP._serialized_end = 7149
+ _DEDUPLICATE._serialized_start = 7152
+ _DEDUPLICATE._serialized_end = 7392
+ _LOCALRELATION._serialized_start = 7394
+ _LOCALRELATION._serialized_end = 7483
+ _CACHEDLOCALRELATION._serialized_start = 7485
+ _CACHEDLOCALRELATION._serialized_end = 7557
+ _CACHEDREMOTERELATION._serialized_start = 7559
+ _CACHEDREMOTERELATION._serialized_end = 7614
+ _SAMPLE._serialized_start = 7617
+ _SAMPLE._serialized_end = 7890
+ _RANGE._serialized_start = 7893
+ _RANGE._serialized_end = 8038
+ _SUBQUERYALIAS._serialized_start = 8040
+ _SUBQUERYALIAS._serialized_end = 8154
+ _REPARTITION._serialized_start = 8157
+ _REPARTITION._serialized_end = 8299
+ _SHOWSTRING._serialized_start = 8302
+ _SHOWSTRING._serialized_end = 8444
+ _HTMLSTRING._serialized_start = 8446
+ _HTMLSTRING._serialized_end = 8560
+ _STATSUMMARY._serialized_start = 8562
+ _STATSUMMARY._serialized_end = 8654
+ _STATDESCRIBE._serialized_start = 8656
+ _STATDESCRIBE._serialized_end = 8737
+ _STATCROSSTAB._serialized_start = 8739
+ _STATCROSSTAB._serialized_end = 8840
+ _STATCOV._serialized_start = 8842
+ _STATCOV._serialized_end = 8938
+ _STATCORR._serialized_start = 8941
+ _STATCORR._serialized_end = 9078
+ _STATAPPROXQUANTILE._serialized_start = 9081
+ _STATAPPROXQUANTILE._serialized_end = 9245
+ _STATFREQITEMS._serialized_start = 9247
+ _STATFREQITEMS._serialized_end = 9372
+ _STATSAMPLEBY._serialized_start = 9375
+ _STATSAMPLEBY._serialized_end = 9684
+ _STATSAMPLEBY_FRACTION._serialized_start = 9576
+ _STATSAMPLEBY_FRACTION._serialized_end = 9675
+ _NAFILL._serialized_start = 9687
+ _NAFILL._serialized_end = 9821
+ _NADROP._serialized_start = 9824
+ _NADROP._serialized_end = 9958
+ _NAREPLACE._serialized_start = 9961
+ _NAREPLACE._serialized_end = 10257
+ _NAREPLACE_REPLACEMENT._serialized_start = 10116
+ _NAREPLACE_REPLACEMENT._serialized_end = 10257
+ _TODF._serialized_start = 10259
+ _TODF._serialized_end = 10347
+ _WITHCOLUMNSRENAMED._serialized_start = 10350
+ _WITHCOLUMNSRENAMED._serialized_end = 10589
+ _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_start = 10522
+ _WITHCOLUMNSRENAMED_RENAMECOLUMNSMAPENTRY._serialized_end = 10589
+ _WITHCOLUMNS._serialized_start = 10591
+ _WITHCOLUMNS._serialized_end = 10710
+ _WITHWATERMARK._serialized_start = 10713
+ _WITHWATERMARK._serialized_end = 10847
+ _HINT._serialized_start = 10850
+ _HINT._serialized_end = 10982
+ _UNPIVOT._serialized_start = 10985
+ _UNPIVOT._serialized_end = 11312
+ _UNPIVOT_VALUES._serialized_start = 11242
+ _UNPIVOT_VALUES._serialized_end = 11301
+ _TOSCHEMA._serialized_start = 11314
+ _TOSCHEMA._serialized_end = 11420
+ _REPARTITIONBYEXPRESSION._serialized_start = 11423
+ _REPARTITIONBYEXPRESSION._serialized_end = 11626
+ _MAPPARTITIONS._serialized_start = 11629
+ _MAPPARTITIONS._serialized_end = 11810
+ _GROUPMAP._serialized_start = 11813
+ _GROUPMAP._serialized_end = 12448
+ _COGROUPMAP._serialized_start = 12451
+ _COGROUPMAP._serialized_end = 12977
+ _APPLYINPANDASWITHSTATE._serialized_start = 12980
+ _APPLYINPANDASWITHSTATE._serialized_end = 13337
+ _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_start = 13340
+ _COMMONINLINEUSERDEFINEDTABLEFUNCTION._serialized_end = 13584
+ _PYTHONUDTF._serialized_start = 13587
+ _PYTHONUDTF._serialized_end = 13764
+ _COLLECTMETRICS._serialized_start = 13767
+ _COLLECTMETRICS._serialized_end = 13903
+ _PARSE._serialized_start = 13906
+ _PARSE._serialized_end = 14294
+ _PARSE_OPTIONSENTRY._serialized_start = 4291
+ _PARSE_OPTIONSENTRY._serialized_end = 4349
+ _PARSE_PARSEFORMAT._serialized_start = 14195
+ _PARSE_PARSEFORMAT._serialized_end = 14283
+ _ASOFJOIN._serialized_start = 14297
+ _ASOFJOIN._serialized_end = 14772
# @@protoc_insertion_point(module_scope)
diff --git a/python/pyspark/sql/connect/proto/relations_pb2.pyi
b/python/pyspark/sql/connect/proto/relations_pb2.pyi
index fb4a3661764..5bca4f21b2e 100644
--- a/python/pyspark/sql/connect/proto/relations_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/relations_pb2.pyi
@@ -100,6 +100,7 @@ class Relation(google.protobuf.message.Message):
CACHED_LOCAL_RELATION_FIELD_NUMBER: builtins.int
CACHED_REMOTE_RELATION_FIELD_NUMBER: builtins.int
COMMON_INLINE_USER_DEFINED_TABLE_FUNCTION_FIELD_NUMBER: builtins.int
+ AS_OF_JOIN_FIELD_NUMBER: builtins.int
FILL_NA_FIELD_NUMBER: builtins.int
DROP_NA_FIELD_NUMBER: builtins.int
REPLACE_FIELD_NUMBER: builtins.int
@@ -193,6 +194,8 @@ class Relation(google.protobuf.message.Message):
self,
) -> global___CommonInlineUserDefinedTableFunction: ...
@property
+ def as_of_join(self) -> global___AsOfJoin: ...
+ @property
def fill_na(self) -> global___NAFill:
"""NA functions"""
@property
@@ -268,6 +271,7 @@ class Relation(google.protobuf.message.Message):
cached_remote_relation: global___CachedRemoteRelation | None = ...,
common_inline_user_defined_table_function:
global___CommonInlineUserDefinedTableFunction
| None = ...,
+ as_of_join: global___AsOfJoin | None = ...,
fill_na: global___NAFill | None = ...,
drop_na: global___NADrop | None = ...,
replace: global___NAReplace | None = ...,
@@ -292,6 +296,8 @@ class Relation(google.protobuf.message.Message):
b"apply_in_pandas_with_state",
"approx_quantile",
b"approx_quantile",
+ "as_of_join",
+ b"as_of_join",
"cached_local_relation",
b"cached_local_relation",
"cached_remote_relation",
@@ -403,6 +409,8 @@ class Relation(google.protobuf.message.Message):
b"apply_in_pandas_with_state",
"approx_quantile",
b"approx_quantile",
+ "as_of_join",
+ b"as_of_join",
"cached_local_relation",
b"cached_local_relation",
"cached_remote_relation",
@@ -546,6 +554,7 @@ class Relation(google.protobuf.message.Message):
"cached_local_relation",
"cached_remote_relation",
"common_inline_user_defined_table_function",
+ "as_of_join",
"fill_na",
"drop_na",
"replace",
@@ -3672,3 +3681,117 @@ class Parse(google.protobuf.message.Message):
) -> typing_extensions.Literal["schema"] | None: ...
global___Parse = Parse
+
+class AsOfJoin(google.protobuf.message.Message):
+ """Relation of type [[AsOfJoin]].
+
+ `left` and `right` must be present.
+ """
+
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ LEFT_FIELD_NUMBER: builtins.int
+ RIGHT_FIELD_NUMBER: builtins.int
+ LEFT_AS_OF_FIELD_NUMBER: builtins.int
+ RIGHT_AS_OF_FIELD_NUMBER: builtins.int
+ JOIN_EXPR_FIELD_NUMBER: builtins.int
+ USING_COLUMNS_FIELD_NUMBER: builtins.int
+ JOIN_TYPE_FIELD_NUMBER: builtins.int
+ TOLERANCE_FIELD_NUMBER: builtins.int
+ ALLOW_EXACT_MATCHES_FIELD_NUMBER: builtins.int
+ DIRECTION_FIELD_NUMBER: builtins.int
+ @property
+ def left(self) -> global___Relation:
+ """(Required) Left input relation for a Join."""
+ @property
+ def right(self) -> global___Relation:
+ """(Required) Right input relation for a Join."""
+ @property
+ def left_as_of(self) ->
pyspark.sql.connect.proto.expressions_pb2.Expression:
+ """(Required) Field to join on in left DataFrame"""
+ @property
+ def right_as_of(self) ->
pyspark.sql.connect.proto.expressions_pb2.Expression:
+ """(Required) Field to join on in right DataFrame"""
+ @property
+ def join_expr(self) ->
pyspark.sql.connect.proto.expressions_pb2.Expression:
+ """(Optional) The join condition. Could be unset when `using_columns`
is utilized.
+
+ This field does not co-exist with using_columns.
+ """
+ @property
+ def using_columns(
+ self,
+ ) ->
google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]:
+ """Optional. using_columns provides a list of columns that should
present on both sides of
+ the join inputs that this Join will join on. For example A JOIN B
USING col_name is
+ equivalent to A JOIN B on A.col_name = B.col_name.
+
+ This field does not co-exist with join_condition.
+ """
+ join_type: builtins.str
+ """(Required) The join type."""
+ @property
+ def tolerance(self) ->
pyspark.sql.connect.proto.expressions_pb2.Expression:
+ """(Optional) The asof tolerance within this range."""
+ allow_exact_matches: builtins.bool
+ """(Required) Whether allow matching with the same value or not."""
+ direction: builtins.str
+ """(Required) Whether to search for prior, subsequent, or closest
matches."""
+ def __init__(
+ self,
+ *,
+ left: global___Relation | None = ...,
+ right: global___Relation | None = ...,
+ left_as_of: pyspark.sql.connect.proto.expressions_pb2.Expression |
None = ...,
+ right_as_of: pyspark.sql.connect.proto.expressions_pb2.Expression |
None = ...,
+ join_expr: pyspark.sql.connect.proto.expressions_pb2.Expression | None
= ...,
+ using_columns: collections.abc.Iterable[builtins.str] | None = ...,
+ join_type: builtins.str = ...,
+ tolerance: pyspark.sql.connect.proto.expressions_pb2.Expression | None
= ...,
+ allow_exact_matches: builtins.bool = ...,
+ direction: builtins.str = ...,
+ ) -> None: ...
+ def HasField(
+ self,
+ field_name: typing_extensions.Literal[
+ "join_expr",
+ b"join_expr",
+ "left",
+ b"left",
+ "left_as_of",
+ b"left_as_of",
+ "right",
+ b"right",
+ "right_as_of",
+ b"right_as_of",
+ "tolerance",
+ b"tolerance",
+ ],
+ ) -> builtins.bool: ...
+ def ClearField(
+ self,
+ field_name: typing_extensions.Literal[
+ "allow_exact_matches",
+ b"allow_exact_matches",
+ "direction",
+ b"direction",
+ "join_expr",
+ b"join_expr",
+ "join_type",
+ b"join_type",
+ "left",
+ b"left",
+ "left_as_of",
+ b"left_as_of",
+ "right",
+ b"right",
+ "right_as_of",
+ b"right_as_of",
+ "tolerance",
+ b"tolerance",
+ "using_columns",
+ b"using_columns",
+ ],
+ ) -> None: ...
+
+global___AsOfJoin = AsOfJoin
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index bcdae5e40b9..51f18e7b6f3 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -2751,6 +2751,9 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
This is similar to a left-join except that we match on the nearest
key rather than equal keys.
+ .. versionchanged:: 4.0.0
+ Supports Spark Connect.
+
Parameters
----------
other : :class:`DataFrame`
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]