This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new cb922bb [FLINK-12971][table-planner-blink] Remove the constraint that
lookup join needs a primary key or index key
cb922bb is described below
commit cb922bbfa0a26e147638d22aeceaf3e83b01e1d3
Author: Jark Wu <[email protected]>
AuthorDate: Tue Jun 25 17:58:56 2019 +0800
[FLINK-12971][table-planner-blink] Remove the constraint that lookup join
needs a primary key or index key
This closes #8878
---
.../apache/flink/table/sources/DefinedIndexes.java | 40 ------
.../flink/table/sources/DefinedPrimaryKey.java | 44 ------
.../org/apache/flink/table/sources/TableIndex.java | 149 ---------------------
.../table/plan/nodes/common/CommonLookupJoin.scala | 81 ++---------
.../flink/table/plan/util/LookupJoinUtil.scala | 30 -----
.../table/plan/batch/sql/join/LookupJoinTest.scala | 13 +-
.../plan/stream/sql/join/LookupJoinTest.scala | 39 +-----
.../runtime/batch/sql/join/LookupJoinITCase.scala | 2 -
.../runtime/stream/sql/AsyncLookupJoinITCase.scala | 2 -
.../runtime/stream/sql/LookupJoinITCase.scala | 2 -
.../utils/InMemoryLookupableTableSource.scala | 102 ++------------
11 files changed, 25 insertions(+), 479 deletions(-)
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/DefinedIndexes.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/DefinedIndexes.java
deleted file mode 100644
index 38addcc..0000000
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/DefinedIndexes.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources;
-
-import java.util.Collection;
-
-/**
- * The {@link DefinedIndexes} interface can extends a {@link TableSource} to
specify the
- * indexes meta information.
- *
- * <p>An Index can be a Unique Index or Normal Index. An Unique Index is
similar to primary
- * key which defines a column or a group of columns that uniquely identifies
each row in
- * a table or stream. An Normal Index is an index on the defined columns used
to accelerate
- * querying.
- */
-public interface DefinedIndexes {
-
- /**
- * Returns the list of {@link TableIndex}s. Returns empty collection or
null if no
- * index is exist.
- */
- Collection<TableIndex> getIndexes();
-
-}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/DefinedPrimaryKey.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/DefinedPrimaryKey.java
deleted file mode 100644
index 8794efd..0000000
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/DefinedPrimaryKey.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources;
-
-import javax.annotation.Nullable;
-
-import java.util.List;
-
-/**
- * The {@link DefinedPrimaryKey} interface can extends a {@link TableSource}
to specify the
- * primary key meta information.
- *
- * <p>A primary key is a column or a group of columns that uniquely identifies
each row in
- * a table or stream.
- *
- * <p>NOTE: Although a primary key usually has an Unique Index, if you have
defined
- * a primary key, there is no need to define a same index in {@link
DefinedIndexes} again.
- */
-public interface DefinedPrimaryKey {
-
- /**
- * Returns the column names of the primary key. Returns null if no
primary key existed
- * in the {@link TableSource}.
- */
- @Nullable
- List<String> getPrimaryKeyColumns();
-
-}
diff --git
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/TableIndex.java
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/TableIndex.java
deleted file mode 100644
index 1495e2c..0000000
---
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/TableIndex.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * An Index meta information of a Table.
- */
-public class TableIndex {
-
- /**
- * Index type, currently only support NORMAL INDEX, and UNIQUE INDEX.
- */
- public enum IndexType {
- NORMAL,
- UNIQUE
- }
-
- private final String indexName;
- private final IndexType indexType;
- private final List<String> indexedColumns;
- private final String indexComment;
-
- private TableIndex(String indexName, IndexType indexType, List<String>
indexedColumns, String indexComment) {
- this.indexName = indexName;
- this.indexType = indexType;
- this.indexedColumns = indexedColumns;
- this.indexComment = indexComment;
- }
-
- /**
- * Returns name of the Index.
- *
- * @return an optional name of the index.
- */
- public Optional<String> getIndexName() {
- return Optional.ofNullable(indexName);
- }
-
- /**
- * Returns the column names of the index.
- */
- public List<String> getIndexedColumns() {
- return indexedColumns;
- }
-
- /**
- * Returns the type of the index.
- */
- public IndexType getIndexType() {
- return indexType;
- }
-
- /**
- * Returns comment of the index.
- * @return an optional comment of the index.
- */
- public Optional<String> getIndexComment() {
- return Optional.ofNullable(indexComment);
- }
-
- /**
- * Returns a new builder that builds a {@link TableIndex}.
- *
- * <p>For example:
- * <pre>
- * TableIndex.builder()
- * .uniqueIndex()
- * .indexedColumns("user_id", "user_name")
- * .name("idx_1")
- * .build();
- * </pre>
- */
- public static Builder builder() {
- return new Builder();
- }
-
- /**
- * A builder used to construct a {@link TableIndex}.
- */
- public static class Builder {
- private String indexName;
- private IndexType indexType;
- private List<String> indexedColumns;
- private String indexComment;
-
- public Builder normalIndex() {
- checkState(indexType == null, "IndexType has been
set.");
- this.indexType = IndexType.NORMAL;
- return this;
- }
-
- public Builder uniqueIndex() {
- checkState(indexType == null, "IndexType has been
set.");
- this.indexType = IndexType.UNIQUE;
- return this;
- }
-
- public Builder name(String name) {
- this.indexName = name;
- return this;
- }
-
- public Builder indexedColumns(List<String> indexedColumns) {
- this.indexedColumns = indexedColumns;
- return this;
- }
-
- public Builder indexedColumns(String... indexedColumns) {
- this.indexedColumns = Arrays.asList(indexedColumns);
- return this;
- }
-
- public Builder comment(String comment) {
- this.indexComment = comment;
- return this;
- }
-
- public TableIndex build() {
- checkNotNull(indexedColumns);
- checkNotNull(indexType);
- return new TableIndex(indexName, indexType,
indexedColumns, indexComment);
- }
-
- }
-
-}
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
index a09c85b..cb6034a 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
@@ -35,8 +35,7 @@ import org.apache.flink.table.plan.nodes.FlinkRelNode
import org.apache.flink.table.plan.util.LookupJoinUtil._
import org.apache.flink.table.plan.util.{JoinTypeUtil, RelExplainUtil}
import org.apache.flink.table.runtime.join.lookup.{AsyncLookupJoinRunner,
AsyncLookupJoinWithCalcRunner, LookupJoinRunner, LookupJoinWithCalcRunner}
-import org.apache.flink.table.sources.TableIndex.IndexType
-import org.apache.flink.table.sources.{LookupableTableSource, TableIndex,
TableSource}
+import org.apache.flink.table.sources.{LookupableTableSource, TableSource}
import org.apache.flink.table.types.ClassLogicalTypeConverter
import
org.apache.flink.table.types.ClassLogicalTypeConverter.getInternalClassForType
import
org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
@@ -85,19 +84,12 @@ abstract class CommonLookupJoin(
with FlinkRelNode {
val joinKeyPairs: Array[IntPair] = getTemporalTableJoinKeyPairs(joinInfo,
calcOnTemporalTable)
- val indexKeys: Array[TableIndex] = getTableIndexes(tableSource)
// all potential index keys, mapping from field index in table source to
LookupKey
val allLookupKeys: Map[Int, LookupKey] = analyzeLookupKeys(
cluster.getRexBuilder,
joinKeyPairs,
- indexKeys,
tableSource.getTableSchema,
calcOnTemporalTable)
- // the matched best lookup fields which is in defined order, maybe empty
- val matchedLookupFields: Option[Array[Int]] = findMatchedIndex(
- indexKeys,
- tableSource.getTableSchema,
- allLookupKeys)
override def deriveRowType(): RelDataType = {
val flinkTypeFactory =
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
@@ -171,12 +163,10 @@ abstract class CommonLookupJoin(
tableSource,
inputRowType,
tableSourceRowType,
- indexKeys,
allLookupKeys,
- matchedLookupFields,
joinType)
- val lookupFieldsInOrder = matchedLookupFields.get
+ val lookupFieldsInOrder = allLookupKeys.keys.toList.sorted.toArray
val lookupFieldNamesInOrder =
lookupFieldsInOrder.map(tableSchema.getFieldNames()(_))
val lookupFieldTypesInOrder = lookupFieldsInOrder
.map(tableSchema.getFieldDataTypes()(_)).map(fromDataTypeToLogicalType)
@@ -486,13 +476,8 @@ abstract class CommonLookupJoin(
def analyzeLookupKeys(
rexBuilder: RexBuilder,
joinKeyPairs: Array[IntPair],
- tableIndexes: Array[TableIndex],
temporalTableSchema: TableSchema,
calcOnTemporalTable: Option[RexProgram]): Map[Int, LookupKey] = {
- val fieldNames = temporalTableSchema.getFieldNames
- val allIndexFields = tableIndexes
- .flatMap(_.getIndexedColumns.asScala.map(fieldNames.indexOf(_)))
- .toSet
// field_index_in_table_source => constant_lookup_key
val constantLookupKeys = new mutable.HashMap[Int, ConstantLookupKey]
// analyze constant lookup keys
@@ -502,42 +487,12 @@ abstract class CommonLookupJoin(
cluster.getRexBuilder,
program.expandLocalRef(program.getCondition))
// presume 'A = 1 AND A = 2' will be reduced to ALWAYS_FALSE
- extractConstantFieldsFromEquiCondition(condition, allIndexFields,
constantLookupKeys)
+ extractConstantFieldsFromEquiCondition(condition, constantLookupKeys)
}
val fieldRefLookupKeys = joinKeyPairs.map(p => (p.target,
FieldRefLookupKey(p.source)))
(constantLookupKeys ++ fieldRefLookupKeys).toMap
}
- private def findMatchedIndex(
- tableIndexes: Array[TableIndex],
- temporalTableSchema: TableSchema,
- allLookupKeys: Map[Int, LookupKey]): Option[Array[Int]] = {
-
- val fieldNames = temporalTableSchema.getFieldNames
-
- // [(indexFields, isUniqueIndex)]
- val indexes: Array[(Array[Int], Boolean)] = tableIndexes.map { tableIndex
=>
- val indexFields =
tableIndex.getIndexedColumns.asScala.map(fieldNames.indexOf(_)).toArray
- val isUniqueIndex = tableIndex.getIndexType.equals(IndexType.UNIQUE)
- (indexFields, isUniqueIndex)
- }
-
- val matchedIndexes = indexes.filter(_._1.forall(allLookupKeys.contains))
- if (matchedIndexes.length > 1) {
- // find a best one, we prefer a unique index key here
- val uniqueIndex = matchedIndexes.find(_._2).map(_._1)
- if (uniqueIndex.isDefined) {
- uniqueIndex
- } else {
- // all the matched index are normal index, select anyone from matched
indexes
- matchedIndexes.map(_._1).headOption
- }
- } else {
- // select anyone from matched indexes
- matchedIndexes.map(_._1).headOption
- }
- }
-
//
----------------------------------------------------------------------------------------
// Physical Optimization Utilities
//
----------------------------------------------------------------------------------------
@@ -569,17 +524,15 @@ abstract class CommonLookupJoin(
private def extractConstantFieldsFromEquiCondition(
condition: RexNode,
- allIndexFields: Set[Int],
constantFieldMap: mutable.HashMap[Int, ConstantLookupKey]): Unit =
condition match {
case c: RexCall if c.getKind == SqlKind.AND =>
- c.getOperands.asScala.foreach(r => extractConstantField(r,
allIndexFields, constantFieldMap))
- case rex: RexNode => extractConstantField(rex, allIndexFields,
constantFieldMap)
+ c.getOperands.asScala.foreach(r => extractConstantField(r,
constantFieldMap))
+ case rex: RexNode => extractConstantField(rex, constantFieldMap)
case _ =>
}
private def extractConstantField(
pred: RexNode,
- allIndexFields: Set[Int],
constantFieldMap: mutable.HashMap[Int, ConstantLookupKey]): Unit = pred
match {
case c: RexCall if c.getKind == SqlKind.EQUALS =>
val left = c.getOperands.get(0)
@@ -588,10 +541,8 @@ abstract class CommonLookupJoin(
case (literal: RexLiteral, ref: RexInputRef) => (ref, literal)
case (ref: RexInputRef, literal: RexLiteral) => (ref, literal)
}
- if (allIndexFields.contains(inputRef.getIndex)) {
- val dataType = FlinkTypeFactory.toLogicalType(inputRef.getType)
- constantFieldMap.put(inputRef.getIndex, ConstantLookupKey(dataType,
literal))
- }
+ val dataType = FlinkTypeFactory.toLogicalType(inputRef.getType)
+ constantFieldMap.put(inputRef.getIndex, ConstantLookupKey(dataType,
literal))
case _ => // ignore
}
@@ -603,23 +554,14 @@ abstract class CommonLookupJoin(
tableSource: TableSource[_],
inputRowType: RowType,
tableSourceRowType: RowType,
- tableIndexes: Array[TableIndex],
allLookupKeys: Map[Int, LookupKey],
- matchedLookupFields: Option[Array[Int]],
joinType: JoinRelType): Unit = {
- // checked PRIMARY KEY or (UNIQUE) INDEX is defined.
- if (tableIndexes.isEmpty) {
- throw new TableException(
- s"Temporal table join requires table [${tableSource.explainSource()}]
defines " +
- s"a PRIMARY KEY or (UNIQUE) INDEX.")
- }
-
// check join on all fields of PRIMARY KEY or (UNIQUE) INDEX
- if (allLookupKeys.isEmpty || matchedLookupFields.isEmpty) {
+ if (allLookupKeys.isEmpty || allLookupKeys.isEmpty) {
throw new TableException(
- "Temporal table join requires an equality condition on ALL fields of "
+
- s"table [${tableSource.explainSource()}]'s PRIMARY KEY or (UNIQUE)
INDEX(s).")
+ "Temporal table join requires an equality condition on fields of " +
+ s"table [${tableSource.explainSource()}].")
}
if (!tableSource.isInstanceOf[LookupableTableSource[_]]) {
@@ -627,9 +569,8 @@ abstract class CommonLookupJoin(
s"implement LookupableTableSource interface if it is used in temporal
table join.")
}
- val checkedLookupFields = matchedLookupFields.get
- val lookupKeyPairs = joinKeyPairs.filter(p =>
checkedLookupFields.contains(p.target))
+ val lookupKeyPairs = joinKeyPairs.filter(p =>
allLookupKeys.contains(p.target))
val leftKeys = lookupKeyPairs.map(_.source)
val rightKeys = lookupKeyPairs.map(_.target)
val leftKeyTypes = leftKeys.map(inputRowType.getTypeAt)
diff --git
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/LookupJoinUtil.scala
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/LookupJoinUtil.scala
index 8773184..7cdfb40 100644
---
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/LookupJoinUtil.scala
+++
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/LookupJoinUtil.scala
@@ -19,11 +19,8 @@
package org.apache.flink.table.plan.util
import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.table.sources.{DefinedIndexes, DefinedPrimaryKey,
TableIndex, TableSource}
import org.apache.flink.table.types.logical.LogicalType
-import scala.collection.JavaConverters._
-
/**
* Utilities for temporal table join
*/
@@ -47,31 +44,4 @@ object LookupJoinUtil {
*/
case class FieldRefLookupKey(index: Int) extends LookupKey
- /**
- * Gets [[TableIndex]]s from a [[TableSource]]. This will combine primary
key information
- * of [[DefinedPrimaryKey]] and indexes information of [[DefinedIndexes]].
- */
- def getTableIndexes(table: TableSource[_]): Array[TableIndex] = {
- val indexes: Array[TableIndex] = table match {
- case t: DefinedIndexes if t.getIndexes != null =>
t.getIndexes.asScala.toArray
- case _ => Array()
- }
-
- // add primary key into index list because primary key is an index too
- table match {
- case t: DefinedPrimaryKey =>
- val primaryKey = t.getPrimaryKeyColumns
- if (primaryKey != null && !primaryKey.isEmpty) {
- val primaryKeyIndex = TableIndex.builder()
- .uniqueIndex()
- .indexedColumns(primaryKey)
- .build()
- indexes ++ Array(primaryKeyIndex)
- } else {
- indexes
- }
- case _ => indexes
- }
- }
-
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
index 354c6f4..017e1a1 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
@@ -55,15 +55,6 @@ class LookupJoinTest extends TableTestBase {
classOf[TableException]
)
- // can't on non-key fields
- expectExceptionThrown(
- "SELECT * FROM MyTable AS T JOIN temporalTest " +
- "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.age",
- "Temporal table join requires an equality condition on ALL fields of
table " +
- "[TestTemporalTable(id, name, age)]'s PRIMARY KEY or (UNIQUE)
INDEX(s).",
- classOf[TableException]
- )
-
// only support left or inner join
expectExceptionThrown(
"SELECT * FROM MyTable AS T RIGHT JOIN temporalTest " +
@@ -76,8 +67,8 @@ class LookupJoinTest extends TableTestBase {
expectExceptionThrown(
"SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
"FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a + 1 = D.id + 1",
- "Temporal table join requires an equality condition on ALL fields of
table " +
- "[TestTemporalTable(id, name, age)]'s PRIMARY KEY or (UNIQUE)
INDEX(s).",
+ "Temporal table join requires an equality condition on fields of table "
+
+ "[TestTemporalTable(id, name, age)].",
classOf[TableException]
)
}
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
index b4d9a58..790bc8c 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
@@ -35,7 +35,6 @@ import org.junit.Test
import _root_.java.lang.{Long => JLong}
import _root_.java.sql.Timestamp
-import _root_.java.util
import _root_.java.util.concurrent.CompletableFuture
import _root_.java.util.{Collection => JCollection}
@@ -71,15 +70,6 @@ class LookupJoinTest extends TableTestBase with Serializable
{
classOf[TableException]
)
- // can't on non-key fields
- expectExceptionThrown(
- "SELECT * FROM MyTable AS T JOIN temporalTest " +
- "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.age",
- "Temporal table join requires an equality condition on ALL fields of
table " +
- "[TestTemporalTable(id, name, age)]'s PRIMARY KEY or (UNIQUE)
INDEX(s).",
- classOf[TableException]
- )
-
// only support left or inner join
expectExceptionThrown(
"SELECT * FROM MyTable AS T RIGHT JOIN temporalTest " +
@@ -92,8 +82,8 @@ class LookupJoinTest extends TableTestBase with Serializable {
expectExceptionThrown(
"SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
"FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a + 1 = D.id + 2",
- "Temporal table join requires an equality condition on ALL fields of
table " +
- "[TestTemporalTable(id, name, age)]'s PRIMARY KEY or (UNIQUE)
INDEX(s).",
+ "Temporal table join requires an equality condition on fields of table "
+
+ "[TestTemporalTable(id, name, age)].",
classOf[TableException]
)
@@ -354,8 +344,7 @@ class LookupJoinTest extends TableTestBase with
Serializable {
class TestTemporalTable
- extends LookupableTableSource[BaseRow]
- with DefinedIndexes {
+ extends LookupableTableSource[BaseRow] {
val fieldNames: Array[String] = Array("id", "name", "age")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING,
Types.INT)
@@ -379,26 +368,13 @@ class TestTemporalTable
}
override def getTableSchema: TableSchema = new TableSchema(fieldNames,
fieldTypes)
-
- override def getIndexes: util.Collection[TableIndex] = {
- val index1 = TableIndex.builder()
- .normalIndex()
- .indexedColumns("name")
- .build()
- val index2 = TableIndex.builder()
- .uniqueIndex()
- .indexedColumns("id")
- .build()
- util.Arrays.asList(index1, index2)
- }
}
class TestInvalidTemporalTable private(
async: Boolean,
fetcher: TableFunction[_],
asyncFetcher: AsyncTableFunction[_])
- extends LookupableTableSource[BaseRow]
- with DefinedIndexes {
+ extends LookupableTableSource[BaseRow] {
val fieldNames: Array[String] = Array("id", "name", "age", "ts")
val fieldTypes: Array[TypeInformation[_]] = Array(
@@ -430,13 +406,6 @@ class TestInvalidTemporalTable private(
}
override def isAsyncEnabled: Boolean = async
-
- override def getIndexes: util.Collection[TableIndex] = {
- util.Collections.singleton(TableIndex.builder()
- .uniqueIndex()
- .indexedColumns("id", "name", "ts")
- .build())
- }
}
class InvalidTableFunctionResultType extends TableFunction[String] {
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/LookupJoinITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/LookupJoinITCase.scala
index fef6522..a8ffa15 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/LookupJoinITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/LookupJoinITCase.scala
@@ -44,7 +44,6 @@ class LookupJoinITCase extends BatchTestBase {
.field("age", Types.INT)
.field("id", Types.LONG)
.field("name", Types.STRING)
- .primaryKey("id")
.build()
val userAsyncTableSource = InMemoryLookupableTableSource.builder()
@@ -52,7 +51,6 @@ class LookupJoinITCase extends BatchTestBase {
.field("age", Types.INT)
.field("id", Types.LONG)
.field("name", Types.STRING)
- .primaryKey("id")
.enableAsync()
.build()
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AsyncLookupJoinITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AsyncLookupJoinITCase.scala
index 8575421..c792405 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AsyncLookupJoinITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AsyncLookupJoinITCase.scala
@@ -52,7 +52,6 @@ class AsyncLookupJoinITCase(backend: StateBackendMode)
.field("age", Types.INT)
.field("id", Types.LONG)
.field("name", Types.STRING)
- .primaryKey("id")
.enableAsync()
.build()
@@ -61,7 +60,6 @@ class AsyncLookupJoinITCase(backend: StateBackendMode)
.field("age", Types.INT)
.field("id", Types.LONG)
.field("name", Types.STRING)
- .addUniqueIndex("id", "name")
.enableAsync()
.build()
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala
index b55b332..74c6c51 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala
@@ -55,7 +55,6 @@ class LookupJoinITCase extends StreamingTestBase {
.field("age", Types.INT)
.field("id", Types.LONG)
.field("name", Types.STRING)
- .primaryKey("id")
.build()
val userTableSourceWith2Keys = InMemoryLookupableTableSource.builder()
@@ -63,7 +62,6 @@ class LookupJoinITCase extends StreamingTestBase {
.field("age", Types.INT)
.field("id", Types.LONG)
.field("name", Types.STRING)
- .addUniqueIndex("id", "name")
.build()
@Test
diff --git
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
index 29a116b..2a65683 100644
---
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
+++
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
@@ -24,7 +24,6 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.TableSchema
import org.apache.flink.table.functions.{AsyncTableFunction, FunctionContext,
TableFunction}
import
org.apache.flink.table.runtime.utils.InMemoryLookupableTableSource.{InMemoryAsyncLookupFunction,
InMemoryLookupFunction}
-import org.apache.flink.table.sources.TableIndex.IndexType
import org.apache.flink.table.sources._
import org.apache.flink.types.Row
import org.apache.flink.util.Preconditions
@@ -46,23 +45,8 @@ class InMemoryLookupableTableSource(
fieldNames: Array[String],
fieldTypes: Array[TypeInformation[_]],
data: List[Row],
- primaryKey: Option[Array[String]],
- tableIndexes: Array[TableIndex],
asyncEnabled: Boolean)
- extends LookupableTableSource[Row]
- with DefinedPrimaryKey
- with DefinedIndexes {
-
- lazy val uniqueKeys: Array[Array[String]] = {
- val keys = new mutable.ArrayBuffer[Array[String]]()
- if (getPrimaryKeyColumns != null) {
- keys += getPrimaryKeyColumns.asScala.toArray
- }
- getIndexes.asScala
- .filter(_.getIndexType == IndexType.UNIQUE)
- .foreach(keys += _.getIndexedColumns.asScala.toArray)
- keys.toArray
- }
+ extends LookupableTableSource[Row] {
val resourceCounter = new AtomicInteger(0)
@@ -75,26 +59,15 @@ class InMemoryLookupableTableSource(
}
private def convertDataToMap(lookupKeys: Array[String]): Map[Row, List[Row]]
= {
- val isUniqueKey = uniqueKeys.contains(lookupKeys)
val lookupFieldIndexes = lookupKeys.map(fieldNames.indexOf(_))
val map = mutable.HashMap[Row, List[Row]]()
- if (isUniqueKey) {
- data.foreach { row =>
- val key = Row.of(lookupFieldIndexes.map(row.getField): _*)
- val oldValue = map.put(key, List(row))
- if (oldValue.isDefined) {
- throw new IllegalStateException("data contains duplicate keys.")
- }
- }
- } else {
- data.foreach { row =>
- val key = Row.of(lookupFieldIndexes.map(row.getField): _*)
- val oldValue = map.get(key)
- if (oldValue.isDefined) {
- map.put(key, oldValue.get ++ List(row))
- } else {
- map.put(key, List(row))
- }
+ data.foreach { row =>
+ val key = Row.of(lookupFieldIndexes.map(row.getField): _*)
+ val oldValue = map.get(key)
+ if (oldValue.isDefined) {
+ map.put(key, oldValue.get ++ List(row))
+ } else {
+ map.put(key, List(row))
}
}
map.toMap
@@ -106,13 +79,6 @@ class InMemoryLookupableTableSource(
override def getTableSchema: TableSchema = new TableSchema(fieldNames,
fieldTypes)
- override def getPrimaryKeyColumns: util.List[String] = primaryKey match {
- case Some(pk) => pk.toList.asJava
- case None => null // return null to indicate no primary key is defined.
- }
-
- override def getIndexes: util.Collection[TableIndex] =
tableIndexes.toList.asJava
-
@VisibleForTesting
def getResourceCounter: Int = resourceCounter.get()
@@ -136,8 +102,6 @@ object InMemoryLookupableTableSource {
* .field("age", Types.INT)
* .field("id", Types.LONG)
* .field("name", Types.STRING)
- * .primaryKey("id")
- * .addNormalIndex("name")
* .enableAsync()
* .build()
* }}}
@@ -163,16 +127,12 @@ object InMemoryLookupableTableSource {
* .field("age", Types.INT)
* .field("id", Types.LONG)
* .field("name", Types.STRING)
- * .primaryKey("id")
- * .addNormalIndex("name")
* .enableAsync()
* .build()
* }}}
*/
class Builder {
private val schema = new mutable.LinkedHashMap[String,
TypeInformation[_]]()
- private val tableIndexes = new mutable.ArrayBuffer[TableIndex]()
- private var primaryKey: Option[Array[String]] = None
private var data: List[Product] = _
private var asyncEnabled: Boolean = false
@@ -201,50 +161,6 @@ object InMemoryLookupableTableSource {
}
/**
- * Sets primary key for the table source.
- */
- def primaryKey(fields: String*): Builder = {
- if (fields.isEmpty) {
- throw new IllegalArgumentException("fields should not be empty.")
- }
- if (primaryKey != null && primaryKey.isDefined) {
- throw new IllegalArgumentException("primary key has been set.")
- }
- this.primaryKey = Some(fields.toArray)
- this
- }
-
- /**
- * Adds a normal [[TableIndex]] for the table source
- */
- def addNormalIndex(fields: String*): Builder = {
- if (fields.isEmpty) {
- throw new IllegalArgumentException("fields should not be empty.")
- }
- val index = TableIndex.builder()
- .normalIndex()
- .indexedColumns(fields: _*)
- .build()
- tableIndexes += index
- this
- }
-
- /**
- * Adds an unique [[TableIndex]] for the table source
- */
- def addUniqueIndex(fields: String*): Builder = {
- if (fields.isEmpty) {
- throw new IllegalArgumentException("fields should not be empty.")
- }
- val index = TableIndex.builder()
- .uniqueIndex()
- .indexedColumns(fields: _*)
- .build()
- tableIndexes += index
- this
- }
-
- /**
* Enables async lookup for the table source
*/
def enableAsync(): Builder = {
@@ -269,8 +185,6 @@ object InMemoryLookupableTableSource {
fieldNames,
fieldTypes,
rowData,
- primaryKey,
- tableIndexes.toArray,
asyncEnabled
)
}