This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cb922bb  [FLINK-12971][table-planner-blink] Remove the constraint that 
lookup join needs a primary key or index key
cb922bb is described below

commit cb922bbfa0a26e147638d22aeceaf3e83b01e1d3
Author: Jark Wu <[email protected]>
AuthorDate: Tue Jun 25 17:58:56 2019 +0800

    [FLINK-12971][table-planner-blink] Remove the constraint that lookup join 
needs a primary key or index key
    
    This closes #8878
---
 .../apache/flink/table/sources/DefinedIndexes.java |  40 ------
 .../flink/table/sources/DefinedPrimaryKey.java     |  44 ------
 .../org/apache/flink/table/sources/TableIndex.java | 149 ---------------------
 .../table/plan/nodes/common/CommonLookupJoin.scala |  81 ++---------
 .../flink/table/plan/util/LookupJoinUtil.scala     |  30 -----
 .../table/plan/batch/sql/join/LookupJoinTest.scala |  13 +-
 .../plan/stream/sql/join/LookupJoinTest.scala      |  39 +-----
 .../runtime/batch/sql/join/LookupJoinITCase.scala  |   2 -
 .../runtime/stream/sql/AsyncLookupJoinITCase.scala |   2 -
 .../runtime/stream/sql/LookupJoinITCase.scala      |   2 -
 .../utils/InMemoryLookupableTableSource.scala      | 102 ++------------
 11 files changed, 25 insertions(+), 479 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/DefinedIndexes.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/DefinedIndexes.java
deleted file mode 100644
index 38addcc..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/DefinedIndexes.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources;
-
-import java.util.Collection;
-
-/**
- * The {@link DefinedIndexes} interface can extends a {@link TableSource} to 
specify the
- * indexes meta information.
- *
- * <p>An Index can be a Unique Index or Normal Index. An Unique Index is 
similar to primary
- * key which defines a column or a group of columns that uniquely identifies 
each row in
- * a table or stream. An Normal Index is an index on the defined columns used 
to accelerate
- * querying.
- */
-public interface DefinedIndexes {
-
-       /**
-        * Returns the list of {@link TableIndex}s. Returns empty collection or 
null if no
-        * index is exist.
-        */
-       Collection<TableIndex> getIndexes();
-
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/DefinedPrimaryKey.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/DefinedPrimaryKey.java
deleted file mode 100644
index 8794efd..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/DefinedPrimaryKey.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources;
-
-import javax.annotation.Nullable;
-
-import java.util.List;
-
-/**
- * The {@link DefinedPrimaryKey} interface can extends a {@link TableSource} 
to specify the
- * primary key meta information.
- *
- * <p>A primary key is a column or a group of columns that uniquely identifies 
each row in
- * a table or stream.
- *
- * <p>NOTE: Although a primary key usually has an Unique Index, if you have 
defined
- * a primary key, there is no need to define a same index in {@link 
DefinedIndexes} again.
- */
-public interface DefinedPrimaryKey {
-
-       /**
-        * Returns the column names of the primary key. Returns null if no 
primary key existed
-        * in the {@link TableSource}.
-        */
-       @Nullable
-       List<String> getPrimaryKeyColumns();
-
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/TableIndex.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/TableIndex.java
deleted file mode 100644
index 1495e2c..0000000
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/sources/TableIndex.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sources;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-/**
- * An Index meta information of a Table.
- */
-public class TableIndex {
-
-       /**
-        * Index type, currently only support NORMAL INDEX, and UNIQUE INDEX.
-        */
-       public enum IndexType {
-               NORMAL,
-               UNIQUE
-       }
-
-       private final String indexName;
-       private final IndexType indexType;
-       private final List<String> indexedColumns;
-       private final String indexComment;
-
-       private TableIndex(String indexName, IndexType indexType, List<String> 
indexedColumns, String indexComment) {
-               this.indexName = indexName;
-               this.indexType = indexType;
-               this.indexedColumns = indexedColumns;
-               this.indexComment = indexComment;
-       }
-
-       /**
-        * Returns name of the Index.
-        *
-        * @return an optional name of the index.
-        */
-       public Optional<String> getIndexName() {
-               return Optional.ofNullable(indexName);
-       }
-
-       /**
-        * Returns the column names of the index.
-        */
-       public List<String> getIndexedColumns() {
-               return indexedColumns;
-       }
-
-       /**
-        * Returns the type of the index.
-        */
-       public IndexType getIndexType() {
-               return indexType;
-       }
-
-       /**
-        * Returns comment of the index.
-        * @return an optional comment of the index.
-        */
-       public Optional<String> getIndexComment() {
-               return Optional.ofNullable(indexComment);
-       }
-
-       /**
-        * Returns a new builder that builds a {@link TableIndex}.
-        *
-        * <p>For example:
-        * <pre>
-        *     TableIndex.builder()
-        *       .uniqueIndex()
-        *       .indexedColumns("user_id", "user_name")
-        *       .name("idx_1")
-        *       .build();
-        * </pre>
-        */
-       public static Builder builder() {
-               return new Builder();
-       }
-
-       /**
-        * A builder used to construct a {@link TableIndex}.
-        */
-       public static class Builder {
-               private String indexName;
-               private IndexType indexType;
-               private List<String> indexedColumns;
-               private String indexComment;
-
-               public Builder normalIndex() {
-                       checkState(indexType == null, "IndexType has been 
set.");
-                       this.indexType = IndexType.NORMAL;
-                       return this;
-               }
-
-               public Builder uniqueIndex() {
-                       checkState(indexType == null, "IndexType has been 
set.");
-                       this.indexType = IndexType.UNIQUE;
-                       return this;
-               }
-
-               public Builder name(String name) {
-                       this.indexName = name;
-                       return this;
-               }
-
-               public Builder indexedColumns(List<String> indexedColumns) {
-                       this.indexedColumns = indexedColumns;
-                       return this;
-               }
-
-               public Builder indexedColumns(String... indexedColumns) {
-                       this.indexedColumns = Arrays.asList(indexedColumns);
-                       return this;
-               }
-
-               public Builder comment(String comment) {
-                       this.indexComment = comment;
-                       return this;
-               }
-
-               public TableIndex build() {
-                       checkNotNull(indexedColumns);
-                       checkNotNull(indexType);
-                       return new TableIndex(indexName, indexType, 
indexedColumns, indexComment);
-               }
-
-       }
-
-}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
index a09c85b..cb6034a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/nodes/common/CommonLookupJoin.scala
@@ -35,8 +35,7 @@ import org.apache.flink.table.plan.nodes.FlinkRelNode
 import org.apache.flink.table.plan.util.LookupJoinUtil._
 import org.apache.flink.table.plan.util.{JoinTypeUtil, RelExplainUtil}
 import org.apache.flink.table.runtime.join.lookup.{AsyncLookupJoinRunner, 
AsyncLookupJoinWithCalcRunner, LookupJoinRunner, LookupJoinWithCalcRunner}
-import org.apache.flink.table.sources.TableIndex.IndexType
-import org.apache.flink.table.sources.{LookupableTableSource, TableIndex, 
TableSource}
+import org.apache.flink.table.sources.{LookupableTableSource, TableSource}
 import org.apache.flink.table.types.ClassLogicalTypeConverter
 import 
org.apache.flink.table.types.ClassLogicalTypeConverter.getInternalClassForType
 import 
org.apache.flink.table.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
@@ -85,19 +84,12 @@ abstract class CommonLookupJoin(
   with FlinkRelNode {
 
   val joinKeyPairs: Array[IntPair] = getTemporalTableJoinKeyPairs(joinInfo, 
calcOnTemporalTable)
-  val indexKeys: Array[TableIndex] = getTableIndexes(tableSource)
   // all potential index keys, mapping from field index in table source to 
LookupKey
   val allLookupKeys: Map[Int, LookupKey] = analyzeLookupKeys(
     cluster.getRexBuilder,
     joinKeyPairs,
-    indexKeys,
     tableSource.getTableSchema,
     calcOnTemporalTable)
-  // the matched best lookup fields which is in defined order, maybe empty
-  val matchedLookupFields: Option[Array[Int]] = findMatchedIndex(
-    indexKeys,
-    tableSource.getTableSchema,
-    allLookupKeys)
 
   override def deriveRowType(): RelDataType = {
     val flinkTypeFactory = 
cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
@@ -171,12 +163,10 @@ abstract class CommonLookupJoin(
       tableSource,
       inputRowType,
       tableSourceRowType,
-      indexKeys,
       allLookupKeys,
-      matchedLookupFields,
       joinType)
 
-    val lookupFieldsInOrder = matchedLookupFields.get
+    val lookupFieldsInOrder = allLookupKeys.keys.toList.sorted.toArray
     val lookupFieldNamesInOrder = 
lookupFieldsInOrder.map(tableSchema.getFieldNames()(_))
     val lookupFieldTypesInOrder = lookupFieldsInOrder
       .map(tableSchema.getFieldDataTypes()(_)).map(fromDataTypeToLogicalType)
@@ -486,13 +476,8 @@ abstract class CommonLookupJoin(
   def analyzeLookupKeys(
       rexBuilder: RexBuilder,
       joinKeyPairs: Array[IntPair],
-      tableIndexes: Array[TableIndex],
       temporalTableSchema: TableSchema,
       calcOnTemporalTable: Option[RexProgram]): Map[Int, LookupKey] = {
-    val fieldNames = temporalTableSchema.getFieldNames
-    val allIndexFields = tableIndexes
-      .flatMap(_.getIndexedColumns.asScala.map(fieldNames.indexOf(_)))
-      .toSet
     // field_index_in_table_source => constant_lookup_key
     val constantLookupKeys = new mutable.HashMap[Int, ConstantLookupKey]
     // analyze constant lookup keys
@@ -502,42 +487,12 @@ abstract class CommonLookupJoin(
         cluster.getRexBuilder,
         program.expandLocalRef(program.getCondition))
       // presume 'A = 1 AND A = 2' will be reduced to ALWAYS_FALSE
-      extractConstantFieldsFromEquiCondition(condition, allIndexFields, 
constantLookupKeys)
+      extractConstantFieldsFromEquiCondition(condition, constantLookupKeys)
     }
     val fieldRefLookupKeys = joinKeyPairs.map(p => (p.target, 
FieldRefLookupKey(p.source)))
     (constantLookupKeys ++ fieldRefLookupKeys).toMap
   }
 
-  private def findMatchedIndex(
-      tableIndexes: Array[TableIndex],
-      temporalTableSchema: TableSchema,
-      allLookupKeys: Map[Int, LookupKey]): Option[Array[Int]] = {
-
-    val fieldNames = temporalTableSchema.getFieldNames
-
-    // [(indexFields, isUniqueIndex)]
-    val indexes: Array[(Array[Int], Boolean)] = tableIndexes.map { tableIndex 
=>
-      val indexFields = 
tableIndex.getIndexedColumns.asScala.map(fieldNames.indexOf(_)).toArray
-      val isUniqueIndex = tableIndex.getIndexType.equals(IndexType.UNIQUE)
-      (indexFields, isUniqueIndex)
-    }
-
-    val matchedIndexes = indexes.filter(_._1.forall(allLookupKeys.contains))
-    if (matchedIndexes.length > 1) {
-      // find a best one, we prefer a unique index key here
-      val uniqueIndex = matchedIndexes.find(_._2).map(_._1)
-      if (uniqueIndex.isDefined) {
-        uniqueIndex
-      } else {
-        // all the matched index are normal index, select anyone from matched 
indexes
-        matchedIndexes.map(_._1).headOption
-      }
-    } else {
-      // select anyone from matched indexes
-      matchedIndexes.map(_._1).headOption
-    }
-  }
-
   // 
----------------------------------------------------------------------------------------
   //                             Physical Optimization Utilities
   // 
----------------------------------------------------------------------------------------
@@ -569,17 +524,15 @@ abstract class CommonLookupJoin(
 
   private def extractConstantFieldsFromEquiCondition(
       condition: RexNode,
-      allIndexFields: Set[Int],
       constantFieldMap: mutable.HashMap[Int, ConstantLookupKey]): Unit = 
condition match {
     case c: RexCall if c.getKind == SqlKind.AND =>
-      c.getOperands.asScala.foreach(r => extractConstantField(r, 
allIndexFields, constantFieldMap))
-    case rex: RexNode => extractConstantField(rex, allIndexFields, 
constantFieldMap)
+      c.getOperands.asScala.foreach(r => extractConstantField(r, 
constantFieldMap))
+    case rex: RexNode => extractConstantField(rex, constantFieldMap)
     case _ =>
   }
 
   private def extractConstantField(
       pred: RexNode,
-      allIndexFields: Set[Int],
       constantFieldMap: mutable.HashMap[Int, ConstantLookupKey]): Unit = pred 
match {
     case c: RexCall if c.getKind == SqlKind.EQUALS =>
       val left = c.getOperands.get(0)
@@ -588,10 +541,8 @@ abstract class CommonLookupJoin(
         case (literal: RexLiteral, ref: RexInputRef) => (ref, literal)
         case (ref: RexInputRef, literal: RexLiteral) => (ref, literal)
       }
-      if (allIndexFields.contains(inputRef.getIndex)) {
-        val dataType = FlinkTypeFactory.toLogicalType(inputRef.getType)
-        constantFieldMap.put(inputRef.getIndex, ConstantLookupKey(dataType, 
literal))
-      }
+      val dataType = FlinkTypeFactory.toLogicalType(inputRef.getType)
+      constantFieldMap.put(inputRef.getIndex, ConstantLookupKey(dataType, 
literal))
     case _ => // ignore
   }
 
@@ -603,23 +554,14 @@ abstract class CommonLookupJoin(
       tableSource: TableSource[_],
       inputRowType: RowType,
       tableSourceRowType: RowType,
-      tableIndexes: Array[TableIndex],
       allLookupKeys: Map[Int, LookupKey],
-      matchedLookupFields: Option[Array[Int]],
       joinType: JoinRelType): Unit = {
 
-    // checked PRIMARY KEY or (UNIQUE) INDEX is defined.
-    if (tableIndexes.isEmpty) {
-      throw new TableException(
-        s"Temporal table join requires table [${tableSource.explainSource()}] 
defines " +
-          s"a PRIMARY KEY or (UNIQUE) INDEX.")
-    }
-
     // check join on all fields of PRIMARY KEY or (UNIQUE) INDEX
-    if (allLookupKeys.isEmpty || matchedLookupFields.isEmpty) {
+    if (allLookupKeys.isEmpty || allLookupKeys.isEmpty) {
       throw new TableException(
-        "Temporal table join requires an equality condition on ALL fields of " 
+
-          s"table [${tableSource.explainSource()}]'s PRIMARY KEY or (UNIQUE) 
INDEX(s).")
+        "Temporal table join requires an equality condition on fields of " +
+          s"table [${tableSource.explainSource()}].")
     }
 
     if (!tableSource.isInstanceOf[LookupableTableSource[_]]) {
@@ -627,9 +569,8 @@ abstract class CommonLookupJoin(
         s"implement LookupableTableSource interface if it is used in temporal 
table join.")
     }
 
-    val checkedLookupFields = matchedLookupFields.get
 
-    val lookupKeyPairs = joinKeyPairs.filter(p => 
checkedLookupFields.contains(p.target))
+    val lookupKeyPairs = joinKeyPairs.filter(p => 
allLookupKeys.contains(p.target))
     val leftKeys = lookupKeyPairs.map(_.source)
     val rightKeys = lookupKeyPairs.map(_.target)
     val leftKeyTypes = leftKeys.map(inputRowType.getTypeAt)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/LookupJoinUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/LookupJoinUtil.scala
index 8773184..7cdfb40 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/LookupJoinUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/util/LookupJoinUtil.scala
@@ -19,11 +19,8 @@
 package org.apache.flink.table.plan.util
 
 import org.apache.calcite.rex.RexLiteral
-import org.apache.flink.table.sources.{DefinedIndexes, DefinedPrimaryKey, 
TableIndex, TableSource}
 import org.apache.flink.table.types.logical.LogicalType
 
-import scala.collection.JavaConverters._
-
 /**
   * Utilities for temporal table join
   */
@@ -47,31 +44,4 @@ object LookupJoinUtil {
     */
   case class FieldRefLookupKey(index: Int) extends LookupKey
 
-  /**
-    * Gets [[TableIndex]]s from a [[TableSource]]. This will combine primary 
key information
-    * of [[DefinedPrimaryKey]] and indexes information of [[DefinedIndexes]].
-    */
-  def getTableIndexes(table: TableSource[_]): Array[TableIndex] = {
-    val indexes: Array[TableIndex] = table match {
-      case t: DefinedIndexes if t.getIndexes != null => 
t.getIndexes.asScala.toArray
-      case _ => Array()
-    }
-
-    // add primary key into index list because primary key is an index too
-    table match {
-      case t: DefinedPrimaryKey =>
-        val primaryKey = t.getPrimaryKeyColumns
-        if (primaryKey != null && !primaryKey.isEmpty) {
-          val primaryKeyIndex = TableIndex.builder()
-              .uniqueIndex()
-              .indexedColumns(primaryKey)
-              .build()
-          indexes ++ Array(primaryKeyIndex)
-        } else {
-          indexes
-        }
-      case _ => indexes
-    }
-  }
-
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
index 354c6f4..017e1a1 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
@@ -55,15 +55,6 @@ class LookupJoinTest extends TableTestBase {
       classOf[TableException]
     )
 
-    // can't on non-key fields
-    expectExceptionThrown(
-      "SELECT * FROM MyTable AS T JOIN temporalTest " +
-        "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.age",
-      "Temporal table join requires an equality condition on ALL fields of 
table " +
-        "[TestTemporalTable(id, name, age)]'s PRIMARY KEY or (UNIQUE) 
INDEX(s).",
-      classOf[TableException]
-    )
-
     // only support left or inner join
     expectExceptionThrown(
       "SELECT * FROM MyTable AS T RIGHT JOIN temporalTest " +
@@ -76,8 +67,8 @@ class LookupJoinTest extends TableTestBase {
     expectExceptionThrown(
       "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
         "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a + 1 = D.id + 1",
-      "Temporal table join requires an equality condition on ALL fields of 
table " +
-        "[TestTemporalTable(id, name, age)]'s PRIMARY KEY or (UNIQUE) 
INDEX(s).",
+      "Temporal table join requires an equality condition on fields of table " 
+
+        "[TestTemporalTable(id, name, age)].",
       classOf[TableException]
     )
   }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
index b4d9a58..790bc8c 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
@@ -35,7 +35,6 @@ import org.junit.Test
 
 import _root_.java.lang.{Long => JLong}
 import _root_.java.sql.Timestamp
-import _root_.java.util
 import _root_.java.util.concurrent.CompletableFuture
 import _root_.java.util.{Collection => JCollection}
 
@@ -71,15 +70,6 @@ class LookupJoinTest extends TableTestBase with Serializable 
{
       classOf[TableException]
     )
 
-    // can't on non-key fields
-    expectExceptionThrown(
-      "SELECT * FROM MyTable AS T JOIN temporalTest " +
-        "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.age",
-      "Temporal table join requires an equality condition on ALL fields of 
table " +
-        "[TestTemporalTable(id, name, age)]'s PRIMARY KEY or (UNIQUE) 
INDEX(s).",
-      classOf[TableException]
-    )
-
     // only support left or inner join
     expectExceptionThrown(
       "SELECT * FROM MyTable AS T RIGHT JOIN temporalTest " +
@@ -92,8 +82,8 @@ class LookupJoinTest extends TableTestBase with Serializable {
     expectExceptionThrown(
       "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
         "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a + 1 = D.id + 2",
-      "Temporal table join requires an equality condition on ALL fields of 
table " +
-        "[TestTemporalTable(id, name, age)]'s PRIMARY KEY or (UNIQUE) 
INDEX(s).",
+      "Temporal table join requires an equality condition on fields of table " 
+
+        "[TestTemporalTable(id, name, age)].",
       classOf[TableException]
     )
 
@@ -354,8 +344,7 @@ class LookupJoinTest extends TableTestBase with 
Serializable {
 
 
 class TestTemporalTable
-  extends LookupableTableSource[BaseRow]
-  with DefinedIndexes {
+  extends LookupableTableSource[BaseRow] {
 
   val fieldNames: Array[String] = Array("id", "name", "age")
   val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, 
Types.INT)
@@ -379,26 +368,13 @@ class TestTemporalTable
   }
 
   override def getTableSchema: TableSchema = new TableSchema(fieldNames, 
fieldTypes)
-
-  override def getIndexes: util.Collection[TableIndex] = {
-    val index1 = TableIndex.builder()
-      .normalIndex()
-      .indexedColumns("name")
-      .build()
-    val index2 = TableIndex.builder()
-      .uniqueIndex()
-      .indexedColumns("id")
-      .build()
-    util.Arrays.asList(index1, index2)
-  }
 }
 
 class TestInvalidTemporalTable private(
     async: Boolean,
     fetcher: TableFunction[_],
     asyncFetcher: AsyncTableFunction[_])
-  extends LookupableTableSource[BaseRow]
-  with DefinedIndexes {
+  extends LookupableTableSource[BaseRow] {
 
   val fieldNames: Array[String] = Array("id", "name", "age", "ts")
   val fieldTypes: Array[TypeInformation[_]] = Array(
@@ -430,13 +406,6 @@ class TestInvalidTemporalTable private(
   }
 
   override def isAsyncEnabled: Boolean = async
-
-  override def getIndexes: util.Collection[TableIndex] = {
-    util.Collections.singleton(TableIndex.builder()
-      .uniqueIndex()
-      .indexedColumns("id", "name", "ts")
-      .build())
-  }
 }
 
 class InvalidTableFunctionResultType extends TableFunction[String] {
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/LookupJoinITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/LookupJoinITCase.scala
index fef6522..a8ffa15 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/LookupJoinITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/batch/sql/join/LookupJoinITCase.scala
@@ -44,7 +44,6 @@ class LookupJoinITCase extends BatchTestBase {
     .field("age", Types.INT)
     .field("id", Types.LONG)
     .field("name", Types.STRING)
-    .primaryKey("id")
     .build()
 
   val userAsyncTableSource = InMemoryLookupableTableSource.builder()
@@ -52,7 +51,6 @@ class LookupJoinITCase extends BatchTestBase {
     .field("age", Types.INT)
     .field("id", Types.LONG)
     .field("name", Types.STRING)
-    .primaryKey("id")
     .enableAsync()
     .build()
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AsyncLookupJoinITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AsyncLookupJoinITCase.scala
index 8575421..c792405 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AsyncLookupJoinITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/AsyncLookupJoinITCase.scala
@@ -52,7 +52,6 @@ class AsyncLookupJoinITCase(backend: StateBackendMode)
     .field("age", Types.INT)
     .field("id", Types.LONG)
     .field("name", Types.STRING)
-    .primaryKey("id")
     .enableAsync()
     .build()
 
@@ -61,7 +60,6 @@ class AsyncLookupJoinITCase(backend: StateBackendMode)
     .field("age", Types.INT)
     .field("id", Types.LONG)
     .field("name", Types.STRING)
-    .addUniqueIndex("id", "name")
     .enableAsync()
     .build()
 
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala
index b55b332..74c6c51 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/stream/sql/LookupJoinITCase.scala
@@ -55,7 +55,6 @@ class LookupJoinITCase extends StreamingTestBase {
     .field("age", Types.INT)
     .field("id", Types.LONG)
     .field("name", Types.STRING)
-    .primaryKey("id")
     .build()
 
   val userTableSourceWith2Keys = InMemoryLookupableTableSource.builder()
@@ -63,7 +62,6 @@ class LookupJoinITCase extends StreamingTestBase {
     .field("age", Types.INT)
     .field("id", Types.LONG)
     .field("name", Types.STRING)
-    .addUniqueIndex("id", "name")
     .build()
 
   @Test
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
index 29a116b..2a65683 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/runtime/utils/InMemoryLookupableTableSource.scala
@@ -24,7 +24,6 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.TableSchema
 import org.apache.flink.table.functions.{AsyncTableFunction, FunctionContext, 
TableFunction}
 import 
org.apache.flink.table.runtime.utils.InMemoryLookupableTableSource.{InMemoryAsyncLookupFunction,
 InMemoryLookupFunction}
-import org.apache.flink.table.sources.TableIndex.IndexType
 import org.apache.flink.table.sources._
 import org.apache.flink.types.Row
 import org.apache.flink.util.Preconditions
@@ -46,23 +45,8 @@ class InMemoryLookupableTableSource(
     fieldNames: Array[String],
     fieldTypes: Array[TypeInformation[_]],
     data: List[Row],
-    primaryKey: Option[Array[String]],
-    tableIndexes: Array[TableIndex],
     asyncEnabled: Boolean)
-  extends LookupableTableSource[Row]
-  with DefinedPrimaryKey
-  with DefinedIndexes {
-
-  lazy val uniqueKeys: Array[Array[String]] = {
-    val keys = new mutable.ArrayBuffer[Array[String]]()
-    if (getPrimaryKeyColumns != null) {
-      keys += getPrimaryKeyColumns.asScala.toArray
-    }
-    getIndexes.asScala
-      .filter(_.getIndexType == IndexType.UNIQUE)
-      .foreach(keys += _.getIndexedColumns.asScala.toArray)
-    keys.toArray
-  }
+  extends LookupableTableSource[Row] {
 
   val resourceCounter = new AtomicInteger(0)
 
@@ -75,26 +59,15 @@ class InMemoryLookupableTableSource(
   }
 
   private def convertDataToMap(lookupKeys: Array[String]): Map[Row, List[Row]] 
= {
-    val isUniqueKey = uniqueKeys.contains(lookupKeys)
     val lookupFieldIndexes = lookupKeys.map(fieldNames.indexOf(_))
     val map = mutable.HashMap[Row, List[Row]]()
-    if (isUniqueKey) {
-      data.foreach { row =>
-        val key = Row.of(lookupFieldIndexes.map(row.getField): _*)
-        val oldValue = map.put(key, List(row))
-        if (oldValue.isDefined) {
-          throw new IllegalStateException("data contains duplicate keys.")
-        }
-      }
-    } else {
-      data.foreach { row =>
-        val key = Row.of(lookupFieldIndexes.map(row.getField): _*)
-        val oldValue = map.get(key)
-        if (oldValue.isDefined) {
-          map.put(key, oldValue.get ++ List(row))
-        } else {
-          map.put(key, List(row))
-        }
+    data.foreach { row =>
+      val key = Row.of(lookupFieldIndexes.map(row.getField): _*)
+      val oldValue = map.get(key)
+      if (oldValue.isDefined) {
+        map.put(key, oldValue.get ++ List(row))
+      } else {
+        map.put(key, List(row))
       }
     }
     map.toMap
@@ -106,13 +79,6 @@ class InMemoryLookupableTableSource(
 
   override def getTableSchema: TableSchema = new TableSchema(fieldNames, 
fieldTypes)
 
-  override def getPrimaryKeyColumns: util.List[String] = primaryKey match {
-    case Some(pk) => pk.toList.asJava
-    case None => null // return null to indicate no primary key is defined.
-  }
-
-  override def getIndexes: util.Collection[TableIndex] = 
tableIndexes.toList.asJava
-
   @VisibleForTesting
   def getResourceCounter: Int = resourceCounter.get()
 
@@ -136,8 +102,6 @@ object InMemoryLookupableTableSource {
     *       .field("age", Types.INT)
     *       .field("id", Types.LONG)
     *       .field("name", Types.STRING)
-    *       .primaryKey("id")
-    *       .addNormalIndex("name")
     *       .enableAsync()
     *       .build()
     * }}}
@@ -163,16 +127,12 @@ object InMemoryLookupableTableSource {
     *       .field("age", Types.INT)
     *       .field("id", Types.LONG)
     *       .field("name", Types.STRING)
-    *       .primaryKey("id")
-    *       .addNormalIndex("name")
     *       .enableAsync()
     *       .build()
     * }}}
     */
   class Builder {
     private val schema = new mutable.LinkedHashMap[String, 
TypeInformation[_]]()
-    private val tableIndexes = new mutable.ArrayBuffer[TableIndex]()
-    private var primaryKey: Option[Array[String]] = None
     private var data: List[Product] = _
     private var asyncEnabled: Boolean = false
 
@@ -201,50 +161,6 @@ object InMemoryLookupableTableSource {
     }
 
     /**
-      * Sets primary key for the table source.
-      */
-    def primaryKey(fields: String*): Builder = {
-      if (fields.isEmpty) {
-        throw new IllegalArgumentException("fields should not be empty.")
-      }
-      if (primaryKey != null && primaryKey.isDefined) {
-        throw new IllegalArgumentException("primary key has been set.")
-      }
-      this.primaryKey = Some(fields.toArray)
-      this
-    }
-
-    /**
-      * Adds a normal [[TableIndex]] for the table source
-      */
-    def addNormalIndex(fields: String*): Builder = {
-      if (fields.isEmpty) {
-        throw new IllegalArgumentException("fields should not be empty.")
-      }
-      val index = TableIndex.builder()
-        .normalIndex()
-        .indexedColumns(fields: _*)
-        .build()
-      tableIndexes += index
-      this
-    }
-
-    /**
-      * Adds an unique [[TableIndex]] for the table source
-      */
-    def addUniqueIndex(fields: String*): Builder = {
-      if (fields.isEmpty) {
-        throw new IllegalArgumentException("fields should not be empty.")
-      }
-      val index = TableIndex.builder()
-        .uniqueIndex()
-        .indexedColumns(fields: _*)
-        .build()
-      tableIndexes += index
-      this
-    }
-
-    /**
       * Enables async lookup for the table source
       */
     def enableAsync(): Builder = {
@@ -269,8 +185,6 @@ object InMemoryLookupableTableSource {
         fieldNames,
         fieldTypes,
         rowData,
-        primaryKey,
-        tableIndexes.toArray,
         asyncEnabled
       )
     }

Reply via email to