This is an automated email from the ASF dual-hosted git repository.

renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new bf81768ff56 [FLINK-28868][connector/hbase] Migrate HBase table 
connector to the new LookupFunction interface
bf81768ff56 is described below

commit bf81768ff564c5bf4a57cb33c6f5126b83b28fb5
Author: Qingsheng Ren <[email protected]>
AuthorDate: Mon Aug 8 17:51:13 2022 +0800

    [FLINK-28868][connector/hbase] Migrate HBase table connector to the new 
LookupFunction interface
    
    This closes #20495
---
 .../b8900323-6aab-4e7e-9b17-f53b3c3dca46           |  12 +--
 .../hbase1/HBase1DynamicTableFactory.java          |  31 +++++-
 .../hbase1/source/HBaseDynamicTableSource.java     |  34 +++++--
 .../hbase1/HBaseDynamicTableFactoryTest.java       |  66 ++++++------
 .../hbase2/HBase2DynamicTableFactory.java          |  39 ++++++-
 .../hbase2/source/HBaseDynamicTableSource.java     |  66 +++++++++---
 .../source/HBaseRowDataAsyncLookupFunction.java    |  66 +++---------
 .../hbase2/HBaseDynamicTableFactoryTest.java       |  71 +++++++------
 .../HBaseRowDataAsyncLookupFunctionTest.java       |  23 ++---
 .../hbase/options/HBaseLookupOptions.java          | 113 ---------------------
 .../source/AbstractHBaseDynamicTableSource.java    |  37 +++++--
 .../hbase/source/HBaseRowDataLookupFunction.java   |  63 ++++--------
 .../hbase/table/HBaseConnectorOptions.java         |   6 ++
 .../hbase/table/HBaseConnectorOptionsUtil.java     |  12 ---
 .../connector/hbase/util/HBaseTableSchema.java     |  33 ++++++
 15 files changed, 325 insertions(+), 347 deletions(-)

diff --git 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/b8900323-6aab-4e7e-9b17-f53b3c3dca46
 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/b8900323-6aab-4e7e-9b17-f53b3c3dca46
index d3793a81c75..8ce680943af 100644
--- 
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/b8900323-6aab-4e7e-9b17-f53b3c3dca46
+++ 
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/b8900323-6aab-4e7e-9b17-f53b3c3dca46
@@ -1,14 +1,14 @@
 Constructor 
<org.apache.flink.connector.hbase.sink.HBaseSinkFunction.<init>(java.lang.String,
 org.apache.hadoop.conf.Configuration, 
org.apache.flink.connector.hbase.sink.HBaseMutationConverter, long, long, 
long)> has parameter of type <org.apache.hadoop.conf.Configuration> in 
(HBaseSinkFunction.java:0)
-Constructor 
<org.apache.flink.connector.hbase.source.AbstractHBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration,
 java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, 
java.lang.String, org.apache.flink.connector.hbase.options.HBaseLookupOptions)> 
has parameter of type <org.apache.hadoop.conf.Configuration> in 
(AbstractHBaseDynamicTableSource.java:0)
-Constructor 
<org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction.<init>(org.apache.hadoop.conf.Configuration,
 java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, 
java.lang.String, org.apache.flink.connector.hbase.options.HBaseLookupOptions)> 
has parameter of type <org.apache.hadoop.conf.Configuration> in 
(HBaseRowDataLookupFunction.java:0)
+Constructor 
<org.apache.flink.connector.hbase.source.AbstractHBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration,
 java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, 
java.lang.String, int, 
org.apache.flink.table.connector.source.lookup.cache.LookupCache)> has 
parameter of type <org.apache.hadoop.conf.Configuration> in 
(AbstractHBaseDynamicTableSource.java:0)
+Constructor 
<org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction.<init>(org.apache.hadoop.conf.Configuration,
 java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, 
java.lang.String, int)> has parameter of type 
<org.apache.hadoop.conf.Configuration> in (HBaseRowDataLookupFunction.java:0)
 Constructor 
<org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink.<init>(java.lang.String,
 org.apache.flink.connector.hbase.util.HBaseTableSchema, 
org.apache.hadoop.conf.Configuration, 
org.apache.flink.connector.hbase.options.HBaseWriteOptions, java.lang.String)> 
has parameter of type <org.apache.hadoop.conf.Configuration> in 
(HBaseDynamicTableSink.java:0)
 Constructor 
<org.apache.flink.connector.hbase1.source.AbstractTableInputFormat.<init>(org.apache.hadoop.conf.Configuration)>
 has parameter of type <org.apache.hadoop.conf.Configuration> in 
(AbstractTableInputFormat.java:0)
-Constructor 
<org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration,
 java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, 
java.lang.String, org.apache.flink.connector.hbase.options.HBaseLookupOptions)> 
has parameter of type <org.apache.hadoop.conf.Configuration> in 
(HBaseDynamicTableSource.java:0)
+Constructor 
<org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration,
 java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, 
java.lang.String, int, 
org.apache.flink.table.connector.source.lookup.cache.LookupCache)> has 
parameter of type <org.apache.hadoop.conf.Configuration> in 
(HBaseDynamicTableSource.java:0)
 Constructor 
<org.apache.flink.connector.hbase1.source.HBaseRowDataInputFormat.<init>(org.apache.hadoop.conf.Configuration,
 java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, 
java.lang.String)> has parameter of type <org.apache.hadoop.conf.Configuration> 
in (HBaseRowDataInputFormat.java:0)
 Constructor 
<org.apache.flink.connector.hbase2.sink.HBaseDynamicTableSink.<init>(java.lang.String,
 org.apache.flink.connector.hbase.util.HBaseTableSchema, 
org.apache.hadoop.conf.Configuration, 
org.apache.flink.connector.hbase.options.HBaseWriteOptions, java.lang.String)> 
has parameter of type <org.apache.hadoop.conf.Configuration> in 
(HBaseDynamicTableSink.java:0)
 Constructor 
<org.apache.flink.connector.hbase2.source.AbstractTableInputFormat.<init>(org.apache.hadoop.conf.Configuration)>
 has parameter of type <org.apache.hadoop.conf.Configuration> in 
(AbstractTableInputFormat.java:0)
-Constructor 
<org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration,
 java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, 
java.lang.String, org.apache.flink.connector.hbase.options.HBaseLookupOptions)> 
has parameter of type <org.apache.hadoop.conf.Configuration> in 
(HBaseDynamicTableSource.java:0)
-Constructor 
<org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.<init>(org.apache.hadoop.conf.Configuration,
 java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, 
java.lang.String, org.apache.flink.connector.hbase.options.HBaseLookupOptions)> 
has parameter of type <org.apache.hadoop.conf.Configuration> in 
(HBaseRowDataAsyncLookupFunction.java:0)
+Constructor 
<org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration,
 java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, 
java.lang.String, int, boolean, 
org.apache.flink.table.connector.source.lookup.cache.LookupCache)> has 
parameter of type <org.apache.hadoop.conf.Configuration> in 
(HBaseDynamicTableSource.java:0)
+Constructor 
<org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.<init>(org.apache.hadoop.conf.Configuration,
 java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, 
java.lang.String, int)> has parameter of type 
<org.apache.hadoop.conf.Configuration> in 
(HBaseRowDataAsyncLookupFunction.java:0)
 Constructor 
<org.apache.flink.connector.hbase2.source.HBaseRowDataInputFormat.<init>(org.apache.hadoop.conf.Configuration,
 java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema, 
java.lang.String)> has parameter of type <org.apache.hadoop.conf.Configuration> 
in (HBaseRowDataInputFormat.java:0)
 Field 
<org.apache.flink.connector.hbase.source.AbstractHBaseDynamicTableSource.conf> 
has type <org.apache.hadoop.conf.Configuration> in 
(AbstractHBaseDynamicTableSource.java:0)
 Field <org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink.hbaseConf> 
has type <org.apache.hadoop.conf.Configuration> in 
(HBaseDynamicTableSink.java:0)
@@ -92,4 +92,4 @@ Method 
<org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter.la
 Method 
<org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$4f4cdb95$1(java.lang.Class,
 
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter$JdbcDeserializationConverter,
 java.lang.Object)> calls method <org.postgresql.jdbc.PgArray.getArray()> in 
(PostgresRowConverter.java:90)
 Method 
<org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(org.apache.flink.streaming.api.scala.DataStream)>
 calls method <org.apache.flink.streaming.api.scala.DataStream.javaStream()> in 
(CassandraSink.java:205)
 Method 
<org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(org.apache.flink.streaming.api.scala.DataStream)>
 has generic parameter type 
<org.apache.flink.streaming.api.scala.DataStream<IN>> with type argument 
depending on <org.apache.flink.streaming.api.scala.DataStream> in 
(CassandraSink.java:0)
-Method 
<org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(org.apache.flink.streaming.api.scala.DataStream)>
 has parameter of type <org.apache.flink.streaming.api.scala.DataStream> in 
(CassandraSink.java:0)
\ No newline at end of file
+Method 
<org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(org.apache.flink.streaming.api.scala.DataStream)>
 has parameter of type <org.apache.flink.streaming.api.scala.DataStream> in 
(CassandraSink.java:0)
diff --git 
a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
 
b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
index 6a3e6ba9379..fbc793ca3ca 100644
--- 
a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
+++ 
b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
@@ -27,12 +27,16 @@ import 
org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink;
 import org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
+import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
 
 import org.apache.hadoop.conf.Configuration;
 
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -52,7 +56,6 @@ import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKE
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.PROPERTIES_PREFIX;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseConfiguration;
-import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseLookupOptions;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseWriteOptions;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.validatePrimaryKey;
 import static 
org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
@@ -78,13 +81,31 @@ public class HBase1DynamicTableFactory
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
         HBaseTableSchema hbaseSchema =
                 
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
+        LookupCache cache = null;
+
+        // Backward compatible to legacy caching options
+        if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0
+                && tableOptions.get(LOOKUP_CACHE_TTL).compareTo(Duration.ZERO) 
> 0) {
+            cache =
+                    DefaultLookupCache.newBuilder()
+                            
.maximumSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS))
+                            
.expireAfterWrite(tableOptions.get(LOOKUP_CACHE_TTL))
+                            .build();
+        }
+
+        if (tableOptions
+                .get(LookupOptions.CACHE_TYPE)
+                .equals(LookupOptions.LookupCacheType.PARTIAL)) {
+            cache = DefaultLookupCache.fromConfig(tableOptions);
+        }
 
         return new HBaseDynamicTableSource(
                 hbaseClientConf,
                 tableName,
                 hbaseSchema,
                 nullStringLiteral,
-                getHBaseLookupOptions(tableOptions));
+                tableOptions.get(LookupOptions.MAX_RETRIES),
+                cache);
     }
 
     @Override
@@ -133,6 +154,12 @@ public class HBase1DynamicTableFactory
         set.add(LOOKUP_CACHE_MAX_ROWS);
         set.add(LOOKUP_CACHE_TTL);
         set.add(LOOKUP_MAX_RETRIES);
+        set.add(LookupOptions.CACHE_TYPE);
+        set.add(LookupOptions.MAX_RETRIES);
+        set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS);
+        set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE);
+        set.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
+        set.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
         return set;
     }
 
diff --git 
a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseDynamicTableSource.java
 
b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseDynamicTableSource.java
index 5375fd31ee1..0322c947ff3 100644
--- 
a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseDynamicTableSource.java
+++ 
b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseDynamicTableSource.java
@@ -19,16 +19,19 @@
 package org.apache.flink.connector.hbase1.source;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
 import org.apache.flink.connector.hbase.source.AbstractHBaseDynamicTableSource;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
 import org.apache.flink.table.data.RowData;
 
 import org.apache.hadoop.conf.Configuration;
 
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
 /** HBase table source implementation. */
 @Internal
 public class HBaseDynamicTableSource extends AbstractHBaseDynamicTableSource {
@@ -38,14 +41,15 @@ public class HBaseDynamicTableSource extends 
AbstractHBaseDynamicTableSource {
             String tableName,
             HBaseTableSchema hbaseSchema,
             String nullStringLiteral,
-            HBaseLookupOptions lookupOptions) {
-        super(conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
+            int maxRetryTimes,
+            @Nullable LookupCache cache) {
+        super(conf, tableName, hbaseSchema, nullStringLiteral, maxRetryTimes, 
cache);
     }
 
     @Override
     public DynamicTableSource copy() {
         return new HBaseDynamicTableSource(
-                conf, tableName, hbaseSchema, nullStringLiteral, 
lookupOptions);
+                conf, tableName, hbaseSchema, nullStringLiteral, 
maxRetryTimes, cache);
     }
 
     @Override
@@ -53,8 +57,22 @@ public class HBaseDynamicTableSource extends 
AbstractHBaseDynamicTableSource {
         return new HBaseRowDataInputFormat(conf, tableName, hbaseSchema, 
nullStringLiteral);
     }
 
-    @VisibleForTesting
-    public HBaseLookupOptions getLookupOptions() {
-        return this.lookupOptions;
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof HBaseDynamicTableSource)) {
+            return false;
+        }
+        HBaseDynamicTableSource that = (HBaseDynamicTableSource) o;
+        return Objects.equals(conf, that.conf)
+                && Objects.equals(tableName, that.tableName)
+                && Objects.equals(hbaseSchema, that.hbaseSchema)
+                && Objects.equals(nullStringLiteral, that.nullStringLiteral)
+                && Objects.equals(maxRetryTimes, that.maxRetryTimes)
+                && Objects.equals(cache, that.cache);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(conf, tableName, hbaseSchema, nullStringLiteral, 
maxRetryTimes, cache);
     }
 }
diff --git 
a/flink-connectors/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
 
b/flink-connectors/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
index b4e30b0ef9f..8a8f1d7068a 100644
--- 
a/flink-connectors/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
+++ 
b/flink-connectors/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
@@ -19,10 +19,8 @@
 package org.apache.flink.connector.hbase1;
 
 import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
 import org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction;
-import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink;
 import org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource;
@@ -32,8 +30,9 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.LookupTableSource;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
-import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
+import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
+import org.apache.flink.table.functions.LookupFunction;
 import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
 import 
org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext;
 import org.apache.flink.table.types.DataType;
@@ -45,10 +44,12 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
+import static 
org.apache.flink.connector.hbase.util.HBaseConfigurationUtil.getHBaseConfiguration;
 import static org.apache.flink.table.api.DataTypes.BIGINT;
 import static org.apache.flink.table.api.DataTypes.BOOLEAN;
 import static org.apache.flink.table.api.DataTypes.DATE;
@@ -62,6 +63,7 @@ import static org.apache.flink.table.api.DataTypes.TIME;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -111,10 +113,10 @@ public class HBaseDynamicTableFactoryTest {
         int[][] lookupKey = {{2}};
         LookupTableSource.LookupRuntimeProvider lookupProvider =
                 hbaseSource.getLookupRuntimeProvider(new 
LookupRuntimeProviderContext(lookupKey));
-        assertTrue(lookupProvider instanceof TableFunctionProvider);
+        assertTrue(lookupProvider instanceof LookupFunctionProvider);
 
-        TableFunction tableFunction =
-                ((TableFunctionProvider) lookupProvider).createTableFunction();
+        LookupFunction tableFunction =
+                ((LookupFunctionProvider) 
lookupProvider).createLookupFunction();
         assertTrue(tableFunction instanceof HBaseRowDataLookupFunction);
         assertEquals(
                 "testHBastTable", ((HBaseRowDataLookupFunction) 
tableFunction).getHTableName());
@@ -141,6 +143,32 @@ public class HBaseDynamicTableFactoryTest {
                 hbaseSchema.getQualifierDataTypes("f4"));
     }
 
+    @Test
+    public void testLookupOptions() {
+        ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, 
STRING()));
+        Map<String, String> options = getAllOptions();
+        options.put("lookup.cache", "PARTIAL");
+        options.put("lookup.partial-cache.expire-after-access", "15213s");
+        options.put("lookup.partial-cache.expire-after-write", "18213s");
+        options.put("lookup.partial-cache.max-rows", "10000");
+        options.put("lookup.partial-cache.cache-missing-key", "false");
+        options.put("lookup.max-retries", "15513");
+
+        DynamicTableSource source = createTableSource(schema, options);
+        HBaseDynamicTableSource hbaseSource = (HBaseDynamicTableSource) source;
+        assertThat(((HBaseDynamicTableSource) 
source).getMaxRetryTimes()).isEqualTo(15513);
+        
assertThat(hbaseSource.getCache()).isInstanceOf(DefaultLookupCache.class);
+        DefaultLookupCache cache = (DefaultLookupCache) hbaseSource.getCache();
+        assertThat(cache)
+                .isEqualTo(
+                        DefaultLookupCache.newBuilder()
+                                .expireAfterAccess(Duration.ofSeconds(15213))
+                                .expireAfterWrite(Duration.ofSeconds(18213))
+                                .maximumSize(10000)
+                                .cacheMissingKey(false)
+                                .build());
+    }
+
     @Test
     public void testTableSinkFactory() {
         ResolvedSchema schema =
@@ -184,8 +212,7 @@ public class HBaseDynamicTableFactoryTest {
                 hbaseSchema.getQualifierDataTypes("f4"));
 
         // verify hadoop Configuration
-        org.apache.hadoop.conf.Configuration expectedConfiguration =
-                HBaseConfigurationUtil.getHBaseConfiguration();
+        org.apache.hadoop.conf.Configuration expectedConfiguration = 
getHBaseConfiguration();
         expectedConfiguration.set(HConstants.ZOOKEEPER_QUORUM, 
"localhost:2181");
         expectedConfiguration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/flink");
         expectedConfiguration.set("hbase.security.authentication", "kerberos");
@@ -245,27 +272,6 @@ public class HBaseDynamicTableFactoryTest {
         assertEquals(2, (long) provider.getParallelism().get());
     }
 
-    @Test
-    public void testLookupOptions() {
-        Map<String, String> options = getAllOptions();
-        options.put("lookup.cache.max-rows", "1000");
-        options.put("lookup.cache.ttl", "10s");
-        options.put("lookup.max-retries", "10");
-        ResolvedSchema schema =
-                ResolvedSchema.of(
-                        Column.physical(ROWKEY, STRING()),
-                        Column.physical(FAMILY1, ROW(FIELD(COL1, DOUBLE()), 
FIELD(COL2, INT()))));
-        DynamicTableSource source = createTableSource(schema, options);
-        HBaseLookupOptions actual = ((HBaseDynamicTableSource) 
source).getLookupOptions();
-        HBaseLookupOptions expected =
-                HBaseLookupOptions.builder()
-                        .setCacheMaxSize(1000)
-                        .setCacheExpireMs(10_000)
-                        .setMaxRetryTimes(10)
-                        .build();
-        assertEquals(expected, actual);
-    }
-
     @Test
     public void testDisabledBufferFlushOptions() {
         Map<String, String> options = getAllOptions();
diff --git 
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
 
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
index 9b9a5258823..8c5513710e3 100644
--- 
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
+++ 
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
@@ -21,19 +21,22 @@ package org.apache.flink.connector.hbase2;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.connector.hbase2.sink.HBaseDynamicTableSink;
 import org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
+import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
 import org.apache.flink.table.factories.DynamicTableSinkFactory;
 import org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
 
 import org.apache.hadoop.conf.Configuration;
 
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -53,7 +56,6 @@ import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKE
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.PROPERTIES_PREFIX;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseConfiguration;
-import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseLookupOptions;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseWriteOptions;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.validatePrimaryKey;
 import static 
org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
@@ -76,13 +78,36 @@ public class HBase2DynamicTableFactory
 
         String tableName = tableOptions.get(TABLE_NAME);
         Configuration hbaseConf = getHBaseConfiguration(tableOptions);
-        HBaseLookupOptions lookupOptions = getHBaseLookupOptions(tableOptions);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
         HBaseTableSchema hbaseSchema =
                 
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
 
+        LookupCache cache = null;
+
+        // Backward compatible to legacy caching options
+        if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0
+                && tableOptions.get(LOOKUP_CACHE_TTL).compareTo(Duration.ZERO) 
> 0) {
+            cache =
+                    DefaultLookupCache.newBuilder()
+                            
.maximumSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS))
+                            
.expireAfterWrite(tableOptions.get(LOOKUP_CACHE_TTL))
+                            .build();
+        }
+
+        if (tableOptions
+                .get(LookupOptions.CACHE_TYPE)
+                .equals(LookupOptions.LookupCacheType.PARTIAL)) {
+            cache = DefaultLookupCache.fromConfig(tableOptions);
+        }
+
         return new HBaseDynamicTableSource(
-                hbaseConf, tableName, hbaseSchema, nullStringLiteral, 
lookupOptions);
+                hbaseConf,
+                tableName,
+                hbaseSchema,
+                nullStringLiteral,
+                tableOptions.get(LookupOptions.MAX_RETRIES),
+                tableOptions.get(LOOKUP_ASYNC),
+                cache);
     }
 
     @Override
@@ -131,6 +156,12 @@ public class HBase2DynamicTableFactory
         set.add(LOOKUP_CACHE_MAX_ROWS);
         set.add(LOOKUP_CACHE_TTL);
         set.add(LOOKUP_MAX_RETRIES);
+        set.add(LookupOptions.CACHE_TYPE);
+        set.add(LookupOptions.MAX_RETRIES);
+        set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS);
+        set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE);
+        set.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
+        set.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
         return set;
     }
 
diff --git 
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
 
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
index 806455c3b5d..7c88703ca96 100644
--- 
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
+++ 
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
@@ -19,33 +19,43 @@
 package org.apache.flink.connector.hbase2.source;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
 import org.apache.flink.connector.hbase.source.AbstractHBaseDynamicTableSource;
 import org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
-import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
+import 
org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
+import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
+import 
org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider;
+import 
org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 
 import org.apache.hadoop.conf.Configuration;
 
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** HBase table source implementation. */
 @Internal
 public class HBaseDynamicTableSource extends AbstractHBaseDynamicTableSource {
 
+    private final boolean lookupAsync;
+
     public HBaseDynamicTableSource(
             Configuration conf,
             String tableName,
             HBaseTableSchema hbaseSchema,
             String nullStringLiteral,
-            HBaseLookupOptions lookupOptions) {
-        super(conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
+            int maxRetryTimes,
+            boolean lookupAsync,
+            @Nullable LookupCache cache) {
+        super(conf, tableName, hbaseSchema, nullStringLiteral, maxRetryTimes, 
cache);
+        this.lookupAsync = lookupAsync;
     }
 
     @Override
@@ -61,21 +71,31 @@ public class HBaseDynamicTableSource extends 
AbstractHBaseDynamicTableSource {
                         .get(context.getKeys()[0][0])
                         .equals(hbaseSchema.getRowKeyName().get()),
                 "Currently, HBase table only supports lookup by rowkey 
field.");
-        if (lookupOptions.getLookupAsync()) {
-            return AsyncTableFunctionProvider.of(
+        if (lookupAsync) {
+            HBaseRowDataAsyncLookupFunction asyncLookupFunction =
                     new HBaseRowDataAsyncLookupFunction(
-                            conf, tableName, hbaseSchema, nullStringLiteral, 
lookupOptions));
+                            conf, tableName, hbaseSchema, nullStringLiteral, 
maxRetryTimes);
+            if (cache != null) {
+                return 
PartialCachingAsyncLookupProvider.of(asyncLookupFunction, cache);
+            } else {
+                return AsyncLookupFunctionProvider.of(asyncLookupFunction);
+            }
         } else {
-            return TableFunctionProvider.of(
+            HBaseRowDataLookupFunction lookupFunction =
                     new HBaseRowDataLookupFunction(
-                            conf, tableName, hbaseSchema, nullStringLiteral, 
lookupOptions));
+                            conf, tableName, hbaseSchema, nullStringLiteral, 
maxRetryTimes);
+            if (cache != null) {
+                return PartialCachingLookupProvider.of(lookupFunction, cache);
+            } else {
+                return LookupFunctionProvider.of(lookupFunction);
+            }
         }
     }
 
     @Override
     public DynamicTableSource copy() {
         return new HBaseDynamicTableSource(
-                conf, tableName, hbaseSchema, nullStringLiteral, 
lookupOptions);
+                conf, tableName, hbaseSchema, nullStringLiteral, 
maxRetryTimes, lookupAsync, cache);
     }
 
     @Override
@@ -83,8 +103,24 @@ public class HBaseDynamicTableSource extends 
AbstractHBaseDynamicTableSource {
         return new HBaseRowDataInputFormat(conf, tableName, hbaseSchema, 
nullStringLiteral);
     }
 
-    @VisibleForTesting
-    public HBaseLookupOptions getLookupOptions() {
-        return this.lookupOptions;
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof HBaseDynamicTableSource)) {
+            return false;
+        }
+        HBaseDynamicTableSource that = (HBaseDynamicTableSource) o;
+        return Objects.equals(conf, that.conf)
+                && Objects.equals(tableName, that.tableName)
+                && Objects.equals(hbaseSchema, that.hbaseSchema)
+                && Objects.equals(nullStringLiteral, that.nullStringLiteral)
+                && Objects.equals(maxRetryTimes, that.maxRetryTimes)
+                && Objects.equals(cache, that.cache)
+                && Objects.equals(lookupAsync, that.lookupAsync);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                conf, tableName, hbaseSchema, nullStringLiteral, 
maxRetryTimes, cache, lookupAsync);
     }
 }
diff --git 
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
 
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
index a41c7a595ba..697d5fd9eab 100644
--- 
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
+++ 
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
@@ -20,21 +20,16 @@ package org.apache.flink.connector.hbase2.source;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
 import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.connector.hbase.util.HBaseSerde;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
-import org.apache.flink.metrics.Gauge;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.AsyncLookupFunction;
 import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
-import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
@@ -56,14 +51,13 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 /**
  * The HBaseRowDataAsyncLookupFunction is an implementation to lookup HBase 
data by rowkey in async
  * fashion. It looks up the result as {@link RowData}.
  */
 @Internal
-public class HBaseRowDataAsyncLookupFunction extends 
AsyncTableFunction<RowData> {
+public class HBaseRowDataAsyncLookupFunction extends AsyncLookupFunction {
 
     private static final Logger LOG =
             LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
@@ -78,10 +72,7 @@ public class HBaseRowDataAsyncLookupFunction extends 
AsyncTableFunction<RowData>
     private transient AsyncTable<ScanResultConsumer> table;
     private transient HBaseSerde serde;
 
-    private final long cacheMaxSize;
-    private final long cacheExpireMs;
     private final int maxRetryTimes;
-    private transient Cache<Object, RowData> cache;
 
     /** The size for thread pool. */
     private static final int THREAD_POOL_SIZE = 16;
@@ -91,14 +82,12 @@ public class HBaseRowDataAsyncLookupFunction extends 
AsyncTableFunction<RowData>
             String hTableName,
             HBaseTableSchema hbaseTableSchema,
             String nullStringLiteral,
-            HBaseLookupOptions lookupOptions) {
+            int maxRetryTimes) {
         this.serializedConfig = 
HBaseConfigurationUtil.serializeConfiguration(configuration);
         this.hTableName = hTableName;
         this.hbaseTableSchema = hbaseTableSchema;
         this.nullStringLiteral = nullStringLiteral;
-        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
-        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
-        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+        this.maxRetryTimes = maxRetryTimes;
     }
 
     @Override
@@ -115,19 +104,6 @@ public class HBaseRowDataAsyncLookupFunction extends 
AsyncTableFunction<RowData>
         try {
             asyncConnection = asyncConnectionFuture.get();
             table = asyncConnection.getTable(TableName.valueOf(hTableName), 
threadPool);
-
-            this.cache =
-                    cacheMaxSize <= 0 || cacheExpireMs <= 0
-                            ? null
-                            : CacheBuilder.newBuilder()
-                                    .recordStats()
-                                    .expireAfterWrite(cacheExpireMs, 
TimeUnit.MILLISECONDS)
-                                    .maximumSize(cacheMaxSize)
-                                    .build();
-            if (cache != null && context != null) {
-                context.getMetricGroup()
-                        .gauge("lookupCacheHitRate", (Gauge<Double>) () -> 
cache.stats().hitRate());
-            }
         } catch (InterruptedException | ExecutionException e) {
             LOG.error("Exception while creating connection to HBase.", e);
             throw new RuntimeException("Cannot create connection to HBase.", 
e);
@@ -139,24 +115,15 @@ public class HBaseRowDataAsyncLookupFunction extends 
AsyncTableFunction<RowData>
     /**
      * The invoke entry point of lookup function.
      *
-     * @param future The result or exception is returned.
-     * @param rowKey the lookup key. Currently only support single rowkey.
+     * @param keyRow A {@link RowData} that wraps lookup keys. Currently only 
support single rowkey.
      */
-    public void eval(CompletableFuture<Collection<RowData>> future, Object 
rowKey) {
+    @Override
+    public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
         int currentRetry = 0;
-        if (cache != null) {
-            RowData cacheRowData = cache.getIfPresent(rowKey);
-            if (cacheRowData != null) {
-                if (cacheRowData.getArity() == 0) {
-                    future.complete(Collections.emptyList());
-                } else {
-                    future.complete(Collections.singletonList(cacheRowData));
-                }
-                return;
-            }
-        }
+        CompletableFuture<Collection<RowData>> future = new 
CompletableFuture<>();
         // fetch result
-        fetchResult(future, currentRetry, rowKey);
+        fetchResult(future, currentRetry, ((GenericRowData) 
keyRow).getField(0));
+        return future;
     }
 
     /**
@@ -199,18 +166,9 @@ public class HBaseRowDataAsyncLookupFunction extends 
AsyncTableFunction<RowData>
                     } else {
                         if (result.isEmpty()) {
                             resultFuture.complete(Collections.emptyList());
-                            if (cache != null) {
-                                cache.put(rowKey, new GenericRowData(0));
-                            }
                         } else {
-                            if (cache != null) {
-                                RowData rowData = 
serde.convertToNewRow(result);
-                                
resultFuture.complete(Collections.singletonList(rowData));
-                                cache.put(rowKey, rowData);
-                            } else {
-                                resultFuture.complete(
-                                        
Collections.singletonList(serde.convertToNewRow(result)));
-                            }
+                            resultFuture.complete(
+                                    
Collections.singletonList(serde.convertToNewRow(result)));
                         }
                     }
                 });
diff --git 
a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
 
b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
index 7d0f900ab3c..a535f0e0c58 100644
--- 
a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
+++ 
b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.connector.hbase2;
 
 import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
 import org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction;
 import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
@@ -31,12 +30,13 @@ import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
-import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.LookupTableSource;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
-import org.apache.flink.table.functions.AsyncTableFunction;
-import org.apache.flink.table.functions.TableFunction;
+import 
org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
+import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
+import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
+import org.apache.flink.table.functions.AsyncLookupFunction;
+import org.apache.flink.table.functions.LookupFunction;
 import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
 import 
org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext;
 import org.apache.flink.table.types.DataType;
@@ -48,6 +48,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -65,6 +66,7 @@ import static org.apache.flink.table.api.DataTypes.TIME;
 import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
 import static 
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -114,10 +116,10 @@ public class HBaseDynamicTableFactoryTest {
         int[][] lookupKey = {{2}};
         LookupTableSource.LookupRuntimeProvider lookupProvider =
                 hbaseSource.getLookupRuntimeProvider(new 
LookupRuntimeProviderContext(lookupKey));
-        assertTrue(lookupProvider instanceof TableFunctionProvider);
+        assertTrue(lookupProvider instanceof LookupFunctionProvider);
 
-        TableFunction tableFunction =
-                ((TableFunctionProvider) lookupProvider).createTableFunction();
+        LookupFunction tableFunction =
+                ((LookupFunctionProvider) 
lookupProvider).createLookupFunction();
         assertTrue(tableFunction instanceof HBaseRowDataLookupFunction);
         assertEquals(
                 "testHBastTable", ((HBaseRowDataLookupFunction) 
tableFunction).getHTableName());
@@ -144,6 +146,32 @@ public class HBaseDynamicTableFactoryTest {
                 hbaseSchema.getQualifierDataTypes("f4"));
     }
 
+    @Test
+    public void testLookupOptions() {
+        ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY, 
STRING()));
+        Map<String, String> options = getAllOptions();
+        options.put("lookup.cache", "PARTIAL");
+        options.put("lookup.partial-cache.expire-after-access", "15213s");
+        options.put("lookup.partial-cache.expire-after-write", "18213s");
+        options.put("lookup.partial-cache.max-rows", "10000");
+        options.put("lookup.partial-cache.cache-missing-key", "false");
+        options.put("lookup.max-retries", "15513");
+
+        DynamicTableSource source = createTableSource(schema, options);
+        HBaseDynamicTableSource hbaseSource = (HBaseDynamicTableSource) source;
+        assertThat(((HBaseDynamicTableSource) 
source).getMaxRetryTimes()).isEqualTo(15513);
+        
assertThat(hbaseSource.getCache()).isInstanceOf(DefaultLookupCache.class);
+        DefaultLookupCache cache = (DefaultLookupCache) hbaseSource.getCache();
+        assertThat(cache)
+                .isEqualTo(
+                        DefaultLookupCache.newBuilder()
+                                .expireAfterAccess(Duration.ofSeconds(15213))
+                                .expireAfterWrite(Duration.ofSeconds(18213))
+                                .maximumSize(10000)
+                                .cacheMissingKey(false)
+                                .build());
+    }
+
     @Test
     public void testTableSinkFactory() {
         ResolvedSchema schema =
@@ -248,27 +276,6 @@ public class HBaseDynamicTableFactoryTest {
         assertEquals(2, (long) provider.getParallelism().get());
     }
 
-    @Test
-    public void testLookupOptions() {
-        Map<String, String> options = getAllOptions();
-        options.put("lookup.cache.max-rows", "1000");
-        options.put("lookup.cache.ttl", "10s");
-        options.put("lookup.max-retries", "10");
-        ResolvedSchema schema =
-                ResolvedSchema.of(
-                        Column.physical(ROWKEY, STRING()),
-                        Column.physical(FAMILY1, ROW(FIELD(COL1, DOUBLE()), 
FIELD(COL2, INT()))));
-        DynamicTableSource source = createTableSource(schema, options);
-        HBaseLookupOptions actual = ((HBaseDynamicTableSource) 
source).getLookupOptions();
-        HBaseLookupOptions expected =
-                HBaseLookupOptions.builder()
-                        .setCacheMaxSize(1000)
-                        .setCacheExpireMs(10_000)
-                        .setMaxRetryTimes(10)
-                        .build();
-        assertEquals(expected, actual);
-    }
-
     @Test
     public void testLookupAsync() {
         Map<String, String> options = getAllOptions();
@@ -284,10 +291,10 @@ public class HBaseDynamicTableFactoryTest {
         int[][] lookupKey = {{0}};
         LookupTableSource.LookupRuntimeProvider lookupProvider =
                 hbaseSource.getLookupRuntimeProvider(new 
LookupRuntimeProviderContext(lookupKey));
-        assertTrue(lookupProvider instanceof AsyncTableFunctionProvider);
+        assertTrue(lookupProvider instanceof AsyncLookupFunctionProvider);
 
-        AsyncTableFunction asyncTableFunction =
-                ((AsyncTableFunctionProvider) 
lookupProvider).createAsyncTableFunction();
+        AsyncLookupFunction asyncTableFunction =
+                ((AsyncLookupFunctionProvider) 
lookupProvider).createAsyncLookupFunction();
         assertTrue(asyncTableFunction instanceof 
HBaseRowDataAsyncLookupFunction);
         assertEquals(
                 "testHBastTable",
diff --git 
a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
 
b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
index 2f8cba68804..312041f220f 100644
--- 
a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
+++ 
b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
@@ -18,18 +18,16 @@
 
 package org.apache.flink.connector.hbase2.source;
 
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.connector.hbase2.util.HBaseTestBase;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -48,15 +46,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 /** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
-@RunWith(Parameterized.class)
 public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {
-    @Parameterized.Parameter public boolean useCache;
-
-    @Parameterized.Parameters(name = "use cache = {0}")
-    public static Object[] parameters() {
-        return new Object[][] {new Object[] {true}, new Object[] {false}};
-    }
-
     @Test
     public void testEval() throws Exception {
         HBaseRowDataAsyncLookupFunction lookupFunction = 
buildRowDataAsyncLookupFunction();
@@ -99,11 +89,6 @@ public class HBaseRowDataAsyncLookupFunctionTest extends 
HBaseTestBase {
     }
 
     private HBaseRowDataAsyncLookupFunction buildRowDataAsyncLookupFunction() {
-        HBaseLookupOptions lookupOptions = 
HBaseLookupOptions.builder().build();
-        if (useCache) {
-            lookupOptions =
-                    
HBaseLookupOptions.builder().setCacheMaxSize(4).setCacheExpireMs(10000).build();
-        }
         DataType dataType =
                 ROW(
                         FIELD(ROW_KEY, INT()),
@@ -117,6 +102,10 @@ public class HBaseRowDataAsyncLookupFunctionTest extends 
HBaseTestBase {
                                         FIELD(F3COL3, STRING()))));
         HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType);
         return new HBaseRowDataAsyncLookupFunction(
-                getConf(), TEST_TABLE_1, hbaseSchema, "null", lookupOptions);
+                getConf(),
+                TEST_TABLE_1,
+                hbaseSchema,
+                "null",
+                LookupOptions.MAX_RETRIES.defaultValue());
     }
 }
diff --git 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseLookupOptions.java
 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseLookupOptions.java
deleted file mode 100644
index 192eb077097..00000000000
--- 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseLookupOptions.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.hbase.options;
-
-import org.apache.flink.annotation.Internal;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/** Options for the HBase lookup. */
-@Internal
-public class HBaseLookupOptions implements Serializable {
-    private static final long serialVersionUID = 1L;
-    private static final int DEFAULT_MAX_RETRY_TIMES = 3;
-
-    private final long cacheMaxSize;
-    private final long cacheExpireMs;
-    private final int maxRetryTimes;
-    private final boolean lookupAsync;
-
-    public HBaseLookupOptions(
-            long cacheMaxSize, long cacheExpireMs, int maxRetryTimes, boolean 
lookupAsync) {
-        this.cacheMaxSize = cacheMaxSize;
-        this.cacheExpireMs = cacheExpireMs;
-        this.maxRetryTimes = maxRetryTimes;
-        this.lookupAsync = lookupAsync;
-    }
-
-    public long getCacheMaxSize() {
-        return cacheMaxSize;
-    }
-
-    public long getCacheExpireMs() {
-        return cacheExpireMs;
-    }
-
-    public int getMaxRetryTimes() {
-        return maxRetryTimes;
-    }
-
-    public boolean getLookupAsync() {
-        return lookupAsync;
-    }
-
-    public static Builder builder() {
-        return new Builder();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (o instanceof HBaseLookupOptions) {
-            HBaseLookupOptions options = (HBaseLookupOptions) o;
-            return Objects.equals(cacheMaxSize, options.cacheMaxSize)
-                    && Objects.equals(cacheExpireMs, options.cacheExpireMs)
-                    && Objects.equals(maxRetryTimes, options.maxRetryTimes)
-                    && Objects.equals(lookupAsync, options.lookupAsync);
-        } else {
-            return false;
-        }
-    }
-
-    /** Builder of {@link HBaseLookupOptions}. */
-    public static class Builder {
-        private long cacheMaxSize = -1L;
-        private long cacheExpireMs = 0L;
-        private int maxRetryTimes = DEFAULT_MAX_RETRY_TIMES;
-        private boolean lookupAsync = false;
-
-        /** optional, lookup cache max size, over this value, the old data 
will be eliminated. */
-        public Builder setCacheMaxSize(long cacheMaxSize) {
-            this.cacheMaxSize = cacheMaxSize;
-            return this;
-        }
-
-        /** optional, lookup cache expire mills, over this time, the old data 
will expire. */
-        public Builder setCacheExpireMs(long cacheExpireMs) {
-            this.cacheExpireMs = cacheExpireMs;
-            return this;
-        }
-
-        /** optional, max retry times for Hbase connector. */
-        public Builder setMaxRetryTimes(int maxRetryTimes) {
-            this.maxRetryTimes = maxRetryTimes;
-            return this;
-        }
-
-        /** optional, whether to set async lookup. */
-        public Builder setLookupAsync(boolean lookupAsync) {
-            this.lookupAsync = lookupAsync;
-            return this;
-        }
-
-        public HBaseLookupOptions build() {
-            return new HBaseLookupOptions(cacheMaxSize, cacheExpireMs, 
maxRetryTimes, lookupAsync);
-        }
-    }
-}
diff --git 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
index bf1aa01fa04..c530cde003a 100644
--- 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
+++ 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
@@ -21,20 +21,23 @@ package org.apache.flink.connector.hbase.source;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.Projection;
 import org.apache.flink.table.connector.source.InputFormatProvider;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
 import 
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
+import 
org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.DataType;
 
 import org.apache.hadoop.conf.Configuration;
 
+import javax.annotation.Nullable;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** HBase table source implementation. */
@@ -46,19 +49,22 @@ public abstract class AbstractHBaseDynamicTableSource
     protected final String tableName;
     protected HBaseTableSchema hbaseSchema;
     protected final String nullStringLiteral;
-    protected final HBaseLookupOptions lookupOptions;
+    protected final int maxRetryTimes;
+    @Nullable protected final LookupCache cache;
 
     public AbstractHBaseDynamicTableSource(
             Configuration conf,
             String tableName,
             HBaseTableSchema hbaseSchema,
             String nullStringLiteral,
-            HBaseLookupOptions lookupOptions) {
+            int maxRetryTimes,
+            @Nullable LookupCache cache) {
         this.conf = conf;
         this.tableName = tableName;
         this.hbaseSchema = hbaseSchema;
         this.nullStringLiteral = nullStringLiteral;
-        this.lookupOptions = lookupOptions;
+        this.maxRetryTimes = maxRetryTimes;
+        this.cache = cache;
     }
 
     @Override
@@ -81,10 +87,14 @@ public abstract class AbstractHBaseDynamicTableSource
                         .get(context.getKeys()[0][0])
                         .equals(hbaseSchema.getRowKeyName().get()),
                 "Currently, HBase table only supports lookup by rowkey 
field.");
-
-        return TableFunctionProvider.of(
+        HBaseRowDataLookupFunction lookupFunction =
                 new HBaseRowDataLookupFunction(
-                        conf, tableName, hbaseSchema, nullStringLiteral, 
lookupOptions));
+                        conf, tableName, hbaseSchema, nullStringLiteral, 
maxRetryTimes);
+        if (cache != null) {
+            return PartialCachingLookupProvider.of(lookupFunction, cache);
+        } else {
+            return LookupFunctionProvider.of(lookupFunction);
+        }
     }
 
     @Override
@@ -116,4 +126,15 @@ public abstract class AbstractHBaseDynamicTableSource
     public HBaseTableSchema getHBaseTableSchema() {
         return this.hbaseSchema;
     }
+
+    @VisibleForTesting
+    public int getMaxRetryTimes() {
+        return maxRetryTimes;
+    }
+
+    @VisibleForTesting
+    @Nullable
+    public LookupCache getCache() {
+        return cache;
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
index 7d7acbccee4..0e1ba542e5b 100644
--- 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
+++ 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
@@ -20,19 +20,15 @@ package org.apache.flink.connector.hbase.source;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
 import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.connector.hbase.util.HBaseSerde;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
-import org.apache.flink.metrics.Gauge;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.functions.FunctionContext;
-import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.LookupFunction;
 import org.apache.flink.util.StringUtils;
 
-import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
@@ -46,7 +42,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.concurrent.TimeUnit;
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * The HBaseRowDataLookupFunction is a standard user-defined table function, 
it can be used in
@@ -54,7 +51,7 @@ import java.util.concurrent.TimeUnit;
  * RowData}.
  */
 @Internal
-public class HBaseRowDataLookupFunction extends TableFunction<RowData> {
+public class HBaseRowDataLookupFunction extends LookupFunction {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(HBaseRowDataLookupFunction.class);
     private static final long serialVersionUID = 1L;
@@ -68,54 +65,39 @@ public class HBaseRowDataLookupFunction extends 
TableFunction<RowData> {
     private transient HTable table;
     private transient HBaseSerde serde;
 
-    private final long cacheMaxSize;
-    private final long cacheExpireMs;
     private final int maxRetryTimes;
-    private transient Cache<Object, RowData> cache;
 
     public HBaseRowDataLookupFunction(
             Configuration configuration,
             String hTableName,
             HBaseTableSchema hbaseTableSchema,
             String nullStringLiteral,
-            HBaseLookupOptions lookupOptions) {
+            int maxRetryTimes) {
         this.serializedConfig = 
HBaseConfigurationUtil.serializeConfiguration(configuration);
         this.hTableName = hTableName;
         this.hbaseTableSchema = hbaseTableSchema;
         this.nullStringLiteral = nullStringLiteral;
-        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
-        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
-        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+        this.maxRetryTimes = maxRetryTimes;
     }
 
     /**
      * The invoke entry point of lookup function.
      *
-     * @param rowKey the lookup key. Currently only support single rowkey.
+     * @param keyRow - A {@link RowData} that wraps lookup keys. Currently 
only support single
+     *     rowkey.
      */
-    public void eval(Object rowKey) throws IOException {
-        if (cache != null) {
-            RowData cacheRowData = cache.getIfPresent(rowKey);
-            if (cacheRowData != null) {
-                collect(cacheRowData);
-                return;
-            }
-        }
+    @Override
+    public Collection<RowData> lookup(RowData keyRow) throws IOException {
         for (int retry = 0; retry <= maxRetryTimes; retry++) {
             try {
-                // fetch result
-                Get get = serde.createGet(rowKey);
+                // TODO: The implementation of LookupFunction will pass a 
GenericRowData as key row
+                // and it's safe to cast for now. We need to update the logic 
once we improve the
+                // LookupFunction in the future.
+                Get get = serde.createGet(((GenericRowData) 
keyRow).getField(0));
                 if (get != null) {
                     Result result = table.get(get);
                     if (!result.isEmpty()) {
-                        if (cache != null) {
-                            // parse and collect
-                            RowData rowData = serde.convertToNewRow(result);
-                            collect(rowData);
-                            cache.put(rowKey, rowData);
-                        } else {
-                            collect(serde.convertToReusedRow(result));
-                        }
+                        return 
Collections.singletonList(serde.convertToReusedRow(result));
                     }
                 }
                 break;
@@ -131,6 +113,7 @@ public class HBaseRowDataLookupFunction extends 
TableFunction<RowData> {
                 }
             }
         }
+        return Collections.emptyList();
     }
 
     private Configuration prepareRuntimeConfiguration() {
@@ -164,18 +147,6 @@ public class HBaseRowDataLookupFunction extends 
TableFunction<RowData> {
         try {
             hConnection = ConnectionFactory.createConnection(config);
             table = (HTable) 
hConnection.getTable(TableName.valueOf(hTableName));
-            this.cache =
-                    cacheMaxSize <= 0 || cacheExpireMs <= 0
-                            ? null
-                            : CacheBuilder.newBuilder()
-                                    .recordStats()
-                                    .expireAfterWrite(cacheExpireMs, 
TimeUnit.MILLISECONDS)
-                                    .maximumSize(cacheMaxSize)
-                                    .build();
-            if (cache != null) {
-                context.getMetricGroup()
-                        .gauge("lookupCacheHitRate", (Gauge<Double>) () -> 
cache.stats().hitRate());
-            }
         } catch (TableNotFoundException tnfe) {
             LOG.error("Table '{}' not found ", hTableName, tnfe);
             throw new RuntimeException("HBase table '" + hTableName + "' not 
found.", tnfe);
diff --git 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
index 4ecc816c371..0c8dc978551 100644
--- 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
+++ 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
 import org.apache.flink.table.factories.FactoryUtil;
 
 import java.time.Duration;
@@ -93,6 +94,8 @@ public class HBaseConnectorOptions {
                     .defaultValue(false)
                     .withDescription("whether to set async lookup.");
 
+    /** @deprecated Please use {@link LookupOptions#PARTIAL_CACHE_MAX_ROWS} 
instead. */
+    @Deprecated
     public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
             ConfigOptions.key("lookup.cache.max-rows")
                     .longType()
@@ -102,12 +105,15 @@ public class HBaseConnectorOptions {
                                     + "be eliminated. \"cache.max-rows\" and 
\"cache.ttl\" options must all be specified if any of them is "
                                     + "specified. Cache is not enabled as 
default.");
 
+    /** @deprecated Please use {@link 
LookupOptions#PARTIAL_CACHE_EXPIRE_AFTER_WRITE} instead. */
+    @Deprecated
     public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
             ConfigOptions.key("lookup.cache.ttl")
                     .durationType()
                     .defaultValue(Duration.ofSeconds(0))
                     .withDescription("the cache time to live.");
 
+    /** @deprecated Please used {@link LookupOptions#MAX_RETRIES} instead. */
     public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
             ConfigOptions.key("lookup.max-retries")
                     .intType()
diff --git 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
index 4585610821a..2141fe10523 100644
--- 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
+++ 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
@@ -20,7 +20,6 @@ package org.apache.flink.connector.hbase.table;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
 import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
 import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
 import org.apache.flink.connector.hbase.util.HBaseTableSchema;
@@ -32,7 +31,6 @@ import org.apache.hadoop.hbase.HConstants;
 import java.util.Map;
 import java.util.Properties;
 
-import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
@@ -95,16 +93,6 @@ public class HBaseConnectorOptionsUtil {
         return builder.build();
     }
 
-    public static HBaseLookupOptions getHBaseLookupOptions(ReadableConfig 
tableOptions) {
-        HBaseLookupOptions.Builder builder = HBaseLookupOptions.builder();
-        
builder.setLookupAsync(tableOptions.get(HBaseConnectorOptions.LOOKUP_ASYNC));
-        
builder.setMaxRetryTimes(tableOptions.get(HBaseConnectorOptions.LOOKUP_MAX_RETRIES));
-        builder.setCacheExpireMs(
-                
tableOptions.get(HBaseConnectorOptions.LOOKUP_CACHE_TTL).toMillis());
-        builder.setCacheMaxSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS));
-        return builder.build();
-    }
-
     /** config HBase Configuration. */
     public static Configuration getHBaseConfiguration(ReadableConfig 
tableOptions) {
         // create default configuration from current runtime env 
(`hbase-site.xml` in classpath)
diff --git 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
index d43828ab9bd..f33ac4e19c2 100644
--- 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
+++ 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
@@ -36,6 +36,7 @@ import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 
 import static 
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
@@ -357,5 +358,37 @@ public class HBaseTableSchema implements Serializable {
             this.rowKeyType = rowKeyType;
             this.rowKeyIndex = rowKeyIndex;
         }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof RowKeyInfo)) {
+                return false;
+            }
+            RowKeyInfo that = (RowKeyInfo) o;
+            return Objects.equals(rowKeyName, that.rowKeyName)
+                    && Objects.equals(rowKeyType, that.rowKeyType)
+                    && Objects.equals(rowKeyIndex, that.rowKeyIndex);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(rowKeyName, rowKeyType, rowKeyIndex);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof HBaseTableSchema)) {
+            return false;
+        }
+        HBaseTableSchema that = (HBaseTableSchema) o;
+        return Objects.equals(familyMap, that.familyMap)
+                && Objects.equals(rowKeyInfo, that.rowKeyInfo)
+                && Objects.equals(charset, that.charset);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(familyMap, rowKeyInfo, charset);
     }
 }

Reply via email to