[
https://issues.apache.org/jira/browse/BEAM-3171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16274775#comment-16274775
]
ASF GitHub Bot commented on BEAM-3171:
--------------------------------------
XuMingmin commented on a change in pull request #4196: [BEAM-3171] convert a
join into lookup
URL: https://github.com/apache/beam/pull/4196#discussion_r154420997
##########
File path:
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
##########
@@ -295,4 +303,37 @@ private BeamRecord buildNullRow(BeamRelNode relNode) {
return new Pair<>(leftIndex, rightIndex);
}
+
+ private PCollection<BeamRecord> joinAsLookup(BeamRelNode leftRelNode,
BeamRelNode rightRelNode,
+ PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv) throws Exception {
+ PCollection<BeamRecord> factStream =
leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+ BeamSqlSeekableTable seekableTable =
getSeekableTableFromRelNode(rightRelNode, sqlEnv);
+
+ return factStream.apply("join_as_lookup",
+ new BeamJoinTransforms.JoinAsLookup(condition, seekableTable,
+ CalciteUtils.toBeamRowType(rightRelNode.getRowType()),
+
CalciteUtils.toBeamRowType(leftRelNode.getRowType()).getFieldCount()));
+ }
+
+ private BeamSqlSeekableTable getSeekableTableFromRelNode(BeamRelNode
relNode, BeamSqlEnv sqlEnv) {
+ BeamIOSourceRel srcRel = (BeamIOSourceRel) relNode;
+ String tableName =
Joiner.on('.').join(srcRel.getTable().getQualifiedName());
Review comment:
Actually `srcRel.getTable().getQualifiedName()` returns a `List`, in Calcite
there's a concept of `database` which we don't touch so far.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> convert a join into lookup
> --------------------------
>
> Key: BEAM-3171
> URL: https://issues.apache.org/jira/browse/BEAM-3171
> Project: Beam
> Issue Type: New Feature
> Components: dsl-sql
> Reporter: Xu Mingmin
> Assignee: Xu Mingmin
> Labels: experimental
>
> We use BeamSQL to run streaming jobs mostly, and add a join_as_lookup
> improvement(internal branch) to cover the streaming-to-batch case(similar as
> [1]). I could submit a PR as experimental if people are interested.
> The rough solution is, if one source of join node implements
> {{BeamSeekableTable}} and the other is not, then the join node is converted
> to a fact-lookup operation.
> Ref:
> [1]
> https://docs.google.com/document/d/1B-XnUwXh64lbswRieckU0BxtygSV58hysqZbpZmk03A/edit?usp=sharing
>
> [~xumingming] [~takidau] for any comments
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)