[
https://issues.apache.org/jira/browse/BEAM-3171?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16275482#comment-16275482
]
ASF GitHub Bot commented on BEAM-3171:
--------------------------------------
xumingming closed pull request #4196: [BEAM-3171] convert a join into lookup
URL: https://github.com/apache/beam/pull/4196
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java
new file mode 100644
index 00000000000..dbfe119ccc5
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlSeekableTable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.values.BeamRecord;
+
+/**
+ * A seekable table converts a JOIN operator to an inline lookup.
+ * It's triggered by {@code SELECT * FROM FACT_TABLE JOIN LOOKUP_TABLE ON ...}.
+ */
+@Experimental
+public interface BeamSqlSeekableTable extends Serializable{
+ /**
+ * return a list of {@code BeamRecord} with given key set.
+ */
+ List<BeamRecord> seekRecord(BeamRecord lookupSubRecord);
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
index cc26aa6672e..203a739dc59 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRel.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
+import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -26,6 +27,8 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
+import org.apache.beam.sdk.extensions.sql.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamJoinTransforms;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
@@ -98,9 +101,14 @@ public BeamJoinRel(RelOptCluster cluster, RelTraitSet
traits, RelNode left, RelN
throws Exception {
BeamRelNode leftRelNode = BeamSqlRelUtils.getBeamRelInput(left);
BeamRecordSqlType leftRowType =
CalciteUtils.toBeamRowType(left.getRowType());
- PCollection<BeamRecord> leftRows =
leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
-
final BeamRelNode rightRelNode = BeamSqlRelUtils.getBeamRelInput(right);
+
+ if (!seekable(leftRelNode, sqlEnv) && seekable(rightRelNode, sqlEnv)) {
+ return joinAsLookup(leftRelNode, rightRelNode, inputPCollections, sqlEnv)
+
.setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());
+ }
+
+ PCollection<BeamRecord> leftRows =
leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
PCollection<BeamRecord> rightRows =
rightRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
String stageName = BeamSqlRelUtils.getStageName(this);
@@ -295,4 +303,37 @@ private BeamRecord buildNullRow(BeamRelNode relNode) {
return new Pair<>(leftIndex, rightIndex);
}
+
+ private PCollection<BeamRecord> joinAsLookup(BeamRelNode leftRelNode,
BeamRelNode rightRelNode,
+ PCollectionTuple inputPCollections, BeamSqlEnv sqlEnv) throws Exception {
+ PCollection<BeamRecord> factStream =
leftRelNode.buildBeamPipeline(inputPCollections, sqlEnv);
+ BeamSqlSeekableTable seekableTable =
getSeekableTableFromRelNode(rightRelNode, sqlEnv);
+
+ return factStream.apply("join_as_lookup",
+ new BeamJoinTransforms.JoinAsLookup(condition, seekableTable,
+ CalciteUtils.toBeamRowType(rightRelNode.getRowType()),
+
CalciteUtils.toBeamRowType(leftRelNode.getRowType()).getFieldCount()));
+ }
+
+ private BeamSqlSeekableTable getSeekableTableFromRelNode(BeamRelNode
relNode, BeamSqlEnv sqlEnv) {
+ BeamIOSourceRel srcRel = (BeamIOSourceRel) relNode;
+ String tableName =
Joiner.on('.').join(srcRel.getTable().getQualifiedName());
+ BeamSqlTable sourceTable = sqlEnv.findTable(tableName);
+ return (BeamSqlSeekableTable) sourceTable;
+ }
+
+ /**
+ * check if {@code BeamRelNode} implements {@code BeamSeekableTable}.
+ */
+ private boolean seekable(BeamRelNode relNode, BeamSqlEnv sqlEnv) {
+ if (relNode instanceof BeamIOSourceRel) {
+ BeamIOSourceRel srcRel = (BeamIOSourceRel) relNode;
+ String tableName =
Joiner.on('.').join(srcRel.getTable().getQualifiedName());
+ BeamSqlTable sourceTable = sqlEnv.findTable(tableName);
+ if (sourceTable instanceof BeamSqlSeekableTable) {
+ return true;
+ }
+ }
+ return false;
+}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
index 3c6b20f1e47..fb578b2c55c 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamJoinTransforms.java
@@ -24,12 +24,19 @@
import java.util.Map;
import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.BeamSqlRecordHelper;
+import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.Pair;
/**
@@ -158,4 +165,78 @@ private static BeamRecord
combineTwoRowsIntoOneHelper(BeamRecord leftRow,
fieldValues.addAll(rightRow.getDataValues());
return new BeamRecord(type, fieldValues);
}
+
+ /**
+ * Transform to execute Join as Lookup.
+ */
+ public static class JoinAsLookup
+ extends PTransform<PCollection<BeamRecord>, PCollection<BeamRecord>> {
+// private RexNode joinCondition;
+ BeamSqlSeekableTable seekableTable;
+ BeamRecordSqlType lkpRowType;
+// int factTableColSize = 0; // TODO
+ BeamRecordSqlType joinSubsetType;
+ List<Integer> factJoinIdx;
+
+ public JoinAsLookup(RexNode joinCondition, BeamSqlSeekableTable
seekableTable,
+ BeamRecordSqlType lkpRowType, int factTableColSize) {
+ this.seekableTable = seekableTable;
+ this.lkpRowType = lkpRowType;
+ joinFieldsMapping(joinCondition, factTableColSize);
+ }
+
+ private void joinFieldsMapping(RexNode joinCondition, int
factTableColSize) {
+ factJoinIdx = new ArrayList<>();
+ List<String> lkpJoinFieldsName = new ArrayList<>();
+ List<Integer> lkpJoinFieldsType = new ArrayList<>();
+
+ RexCall call = (RexCall) joinCondition;
+ if ("AND".equals(call.getOperator().getName())) {
+ List<RexNode> operands = call.getOperands();
+ for (RexNode rexNode : operands) {
+ factJoinIdx.add(((RexInputRef) ((RexCall)
rexNode).getOperands().get(0)).getIndex());
+ int lkpJoinIdx = ((RexInputRef) ((RexCall)
rexNode).getOperands().get(1)).getIndex()
+ - factTableColSize;
+ lkpJoinFieldsName.add(lkpRowType.getFieldNameByIndex(lkpJoinIdx));
+ lkpJoinFieldsType.add(lkpRowType.getFieldTypeByIndex(lkpJoinIdx));
+ }
+ } else if ("=".equals(call.getOperator().getName())) {
+ factJoinIdx.add(((RexInputRef) call.getOperands().get(0)).getIndex());
+ int lkpJoinIdx = ((RexInputRef) call.getOperands().get(1)).getIndex()
+ - factTableColSize;
+ lkpJoinFieldsName.add(lkpRowType.getFieldNameByIndex(lkpJoinIdx));
+ lkpJoinFieldsType.add(lkpRowType.getFieldTypeByIndex(lkpJoinIdx));
+ } else {
+ throw new UnsupportedOperationException(
+ "Operator " + call.getOperator().getName() + " is not supported in
join condition");
+ }
+
+ joinSubsetType = BeamRecordSqlType.create(lkpJoinFieldsName,
lkpJoinFieldsType);
+ }
+
+ @Override
+ public PCollection<BeamRecord> expand(PCollection<BeamRecord> input) {
+ return input.apply("join_as_lookup", ParDo.of(new DoFn<BeamRecord,
BeamRecord>(){
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ BeamRecord factRow = context.element();
+ BeamRecord joinSubRow = extractJoinSubRow(factRow);
+ List<BeamRecord> lookupRows = seekableTable.seekRecord(joinSubRow);
+ for (BeamRecord lr : lookupRows) {
+ context.output(combineTwoRowsIntoOneHelper(factRow, lr));
+ }
+ }
+
+ private BeamRecord extractJoinSubRow(BeamRecord factRow) {
+ List<Object> joinSubsetValues = new ArrayList<>();
+ for (int i : factJoinIdx) {
+ joinSubsetValues.add(factRow.getFieldValue(i));
+ }
+ return new BeamRecord(joinSubsetType, joinSubsetValues);
+ }
+
+ }));
+ }
+ }
+
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java
index 156d7ff5b01..0cd1a2a95db 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlApiSurfaceTest.java
@@ -52,6 +52,7 @@ public void testSdkApiSurface() throws Exception {
.includingClass(BeamSqlUdf.class)
.includingClass(BeamRecordSqlType.class)
.includingClass(BeamSqlRecordHelper.class)
+ .includingClass(BeamSqlSeekableTable.class)
.pruningPrefix("java")
.pruningPattern("org[.]apache[.]beam[.]sdk[.]extensions[.]sql[.].*Test")
.pruningPattern("org[.]apache[.]beam[.]sdk[.]extensions[.]sql[.].*TestBase");
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
index c5145ec1a6d..c6053391f91 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamJoinRelUnboundedVsBoundedTest.java
@@ -19,17 +19,26 @@
package org.apache.beam.sdk.extensions.sql.impl.rel;
import java.sql.Types;
+import java.util.Arrays;
import java.util.Date;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
+import org.apache.beam.sdk.extensions.sql.BeamSqlSeekableTable;
import org.apache.beam.sdk.extensions.sql.TestUtils;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BaseBeamTable;
+import org.apache.beam.sdk.extensions.sql.impl.schema.BeamIOType;
import
org.apache.beam.sdk.extensions.sql.impl.transform.BeamSqlOutputToConsoleFn;
import org.apache.beam.sdk.extensions.sql.mock.MockedBoundedTable;
import org.apache.beam.sdk.extensions.sql.mock.MockedUnboundedTable;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
import org.joda.time.Duration;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -84,8 +93,40 @@ public static void prepare() {
1, "james",
2, "bond"
));
+ BEAM_SQL_ENV.registerTable("SITE_LKP", new SiteLookupTable(
+ TestUtils.buildBeamSqlRowType(Types.INTEGER, "site_id", Types.VARCHAR,
"site_name")));
}
+ /**
+ * Test table for JOIN-AS-LOOKUP.
+ *
+ */
+ public static class SiteLookupTable extends BaseBeamTable implements
BeamSqlSeekableTable{
+
+ public SiteLookupTable(BeamRecordSqlType beamRecordSqlType) {
+ super(beamRecordSqlType);
+ }
+
+ @Override
+ public BeamIOType getSourceType() {
+ return BeamIOType.BOUNDED;
+ }
+
+ @Override
+ public PCollection<BeamRecord> buildIOReader(Pipeline pipeline) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PTransform<? super PCollection<BeamRecord>, PDone> buildIOWriter() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<BeamRecord> seekRecord(BeamRecord lookupSubRecord) {
+ return Arrays.asList(new BeamRecord(getRowType(), 1, "SITE1"));
+ }
+ }
@Test
public void testInnerJoin_unboundedTableOnTheLeftSide() throws Exception {
String sql = "SELECT o1.order_id, o1.sum_site_id, o2.buyer FROM "
@@ -237,4 +278,26 @@ public void testFullOuterJoinError() throws Exception {
compilePipeline(sql, pipeline, BEAM_SQL_ENV);
pipeline.run();
}
+
+ @Test
+ public void testJoinAsLookup() throws Exception {
+ String sql = "SELECT o1.order_id, o2.site_name FROM "
+ + " ORDER_DETAILS o1 "
+ + " JOIN SITE_LKP o2 "
+ + " on "
+ + " o1.site_id=o2.site_id "
+ + " WHERE o1.site_id=1"
+ ;
+ PCollection<BeamRecord> rows = compilePipeline(sql, pipeline,
BEAM_SQL_ENV);
+ PAssert.that(rows.apply(ParDo.of(new TestUtils.BeamSqlRow2StringDoFn())))
+ .containsInAnyOrder(
+ TestUtils.RowsBuilder.of(
+ Types.INTEGER, "order_id",
+ Types.VARCHAR, "site_name"
+ ).addRows(
+ 1, "SITE1"
+ ).getStringRows()
+ );
+ pipeline.run();
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> convert a join into lookup
> --------------------------
>
> Key: BEAM-3171
> URL: https://issues.apache.org/jira/browse/BEAM-3171
> Project: Beam
> Issue Type: New Feature
> Components: dsl-sql
> Reporter: Xu Mingmin
> Assignee: Xu Mingmin
> Labels: experimental
>
> We use BeamSQL to run streaming jobs mostly, and add a join_as_lookup
> improvement(internal branch) to cover the streaming-to-batch case(similar as
> [1]). I could submit a PR as experimental if people are interested.
> The rough solution is, if one source of join node implements
> {{BeamSeekableTable}} and the other is not, then the join node is converted
> to a fact-lookup operation.
> Ref:
> [1]
> https://docs.google.com/document/d/1B-XnUwXh64lbswRieckU0BxtygSV58hysqZbpZmk03A/edit?usp=sharing
>
> [~xumingming] [~takidau] for any comments
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)