This is an automated email from the ASF dual-hosted git repository.
renqs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new bf81768ff56 [FLINK-28868][connector/hbase] Migrate HBase table
connector to the new LookupFunction interface
bf81768ff56 is described below
commit bf81768ff564c5bf4a57cb33c6f5126b83b28fb5
Author: Qingsheng Ren <[email protected]>
AuthorDate: Mon Aug 8 17:51:13 2022 +0800
[FLINK-28868][connector/hbase] Migrate HBase table connector to the new
LookupFunction interface
This closes #20495
---
.../b8900323-6aab-4e7e-9b17-f53b3c3dca46 | 12 +--
.../hbase1/HBase1DynamicTableFactory.java | 31 +++++-
.../hbase1/source/HBaseDynamicTableSource.java | 34 +++++--
.../hbase1/HBaseDynamicTableFactoryTest.java | 66 ++++++------
.../hbase2/HBase2DynamicTableFactory.java | 39 ++++++-
.../hbase2/source/HBaseDynamicTableSource.java | 66 +++++++++---
.../source/HBaseRowDataAsyncLookupFunction.java | 66 +++---------
.../hbase2/HBaseDynamicTableFactoryTest.java | 71 +++++++------
.../HBaseRowDataAsyncLookupFunctionTest.java | 23 ++---
.../hbase/options/HBaseLookupOptions.java | 113 ---------------------
.../source/AbstractHBaseDynamicTableSource.java | 37 +++++--
.../hbase/source/HBaseRowDataLookupFunction.java | 63 ++++--------
.../hbase/table/HBaseConnectorOptions.java | 6 ++
.../hbase/table/HBaseConnectorOptionsUtil.java | 12 ---
.../connector/hbase/util/HBaseTableSchema.java | 33 ++++++
15 files changed, 325 insertions(+), 347 deletions(-)
diff --git
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/b8900323-6aab-4e7e-9b17-f53b3c3dca46
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/b8900323-6aab-4e7e-9b17-f53b3c3dca46
index d3793a81c75..8ce680943af 100644
---
a/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/b8900323-6aab-4e7e-9b17-f53b3c3dca46
+++
b/flink-architecture-tests/flink-architecture-tests-production/archunit-violations/b8900323-6aab-4e7e-9b17-f53b3c3dca46
@@ -1,14 +1,14 @@
Constructor
<org.apache.flink.connector.hbase.sink.HBaseSinkFunction.<init>(java.lang.String,
org.apache.hadoop.conf.Configuration,
org.apache.flink.connector.hbase.sink.HBaseMutationConverter, long, long,
long)> has parameter of type <org.apache.hadoop.conf.Configuration> in
(HBaseSinkFunction.java:0)
-Constructor
<org.apache.flink.connector.hbase.source.AbstractHBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration,
java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema,
java.lang.String, org.apache.flink.connector.hbase.options.HBaseLookupOptions)>
has parameter of type <org.apache.hadoop.conf.Configuration> in
(AbstractHBaseDynamicTableSource.java:0)
-Constructor
<org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction.<init>(org.apache.hadoop.conf.Configuration,
java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema,
java.lang.String, org.apache.flink.connector.hbase.options.HBaseLookupOptions)>
has parameter of type <org.apache.hadoop.conf.Configuration> in
(HBaseRowDataLookupFunction.java:0)
+Constructor
<org.apache.flink.connector.hbase.source.AbstractHBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration,
java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema,
java.lang.String, int,
org.apache.flink.table.connector.source.lookup.cache.LookupCache)> has
parameter of type <org.apache.hadoop.conf.Configuration> in
(AbstractHBaseDynamicTableSource.java:0)
+Constructor
<org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction.<init>(org.apache.hadoop.conf.Configuration,
java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema,
java.lang.String, int)> has parameter of type
<org.apache.hadoop.conf.Configuration> in (HBaseRowDataLookupFunction.java:0)
Constructor
<org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink.<init>(java.lang.String,
org.apache.flink.connector.hbase.util.HBaseTableSchema,
org.apache.hadoop.conf.Configuration,
org.apache.flink.connector.hbase.options.HBaseWriteOptions, java.lang.String)>
has parameter of type <org.apache.hadoop.conf.Configuration> in
(HBaseDynamicTableSink.java:0)
Constructor
<org.apache.flink.connector.hbase1.source.AbstractTableInputFormat.<init>(org.apache.hadoop.conf.Configuration)>
has parameter of type <org.apache.hadoop.conf.Configuration> in
(AbstractTableInputFormat.java:0)
-Constructor
<org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration,
java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema,
java.lang.String, org.apache.flink.connector.hbase.options.HBaseLookupOptions)>
has parameter of type <org.apache.hadoop.conf.Configuration> in
(HBaseDynamicTableSource.java:0)
+Constructor
<org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration,
java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema,
java.lang.String, int,
org.apache.flink.table.connector.source.lookup.cache.LookupCache)> has
parameter of type <org.apache.hadoop.conf.Configuration> in
(HBaseDynamicTableSource.java:0)
Constructor
<org.apache.flink.connector.hbase1.source.HBaseRowDataInputFormat.<init>(org.apache.hadoop.conf.Configuration,
java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema,
java.lang.String)> has parameter of type <org.apache.hadoop.conf.Configuration>
in (HBaseRowDataInputFormat.java:0)
Constructor
<org.apache.flink.connector.hbase2.sink.HBaseDynamicTableSink.<init>(java.lang.String,
org.apache.flink.connector.hbase.util.HBaseTableSchema,
org.apache.hadoop.conf.Configuration,
org.apache.flink.connector.hbase.options.HBaseWriteOptions, java.lang.String)>
has parameter of type <org.apache.hadoop.conf.Configuration> in
(HBaseDynamicTableSink.java:0)
Constructor
<org.apache.flink.connector.hbase2.source.AbstractTableInputFormat.<init>(org.apache.hadoop.conf.Configuration)>
has parameter of type <org.apache.hadoop.conf.Configuration> in
(AbstractTableInputFormat.java:0)
-Constructor
<org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration,
java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema,
java.lang.String, org.apache.flink.connector.hbase.options.HBaseLookupOptions)>
has parameter of type <org.apache.hadoop.conf.Configuration> in
(HBaseDynamicTableSource.java:0)
-Constructor
<org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.<init>(org.apache.hadoop.conf.Configuration,
java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema,
java.lang.String, org.apache.flink.connector.hbase.options.HBaseLookupOptions)>
has parameter of type <org.apache.hadoop.conf.Configuration> in
(HBaseRowDataAsyncLookupFunction.java:0)
+Constructor
<org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource.<init>(org.apache.hadoop.conf.Configuration,
java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema,
java.lang.String, int, boolean,
org.apache.flink.table.connector.source.lookup.cache.LookupCache)> has
parameter of type <org.apache.hadoop.conf.Configuration> in
(HBaseDynamicTableSource.java:0)
+Constructor
<org.apache.flink.connector.hbase2.source.HBaseRowDataAsyncLookupFunction.<init>(org.apache.hadoop.conf.Configuration,
java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema,
java.lang.String, int)> has parameter of type
<org.apache.hadoop.conf.Configuration> in
(HBaseRowDataAsyncLookupFunction.java:0)
Constructor
<org.apache.flink.connector.hbase2.source.HBaseRowDataInputFormat.<init>(org.apache.hadoop.conf.Configuration,
java.lang.String, org.apache.flink.connector.hbase.util.HBaseTableSchema,
java.lang.String)> has parameter of type <org.apache.hadoop.conf.Configuration>
in (HBaseRowDataInputFormat.java:0)
Field
<org.apache.flink.connector.hbase.source.AbstractHBaseDynamicTableSource.conf>
has type <org.apache.hadoop.conf.Configuration> in
(AbstractHBaseDynamicTableSource.java:0)
Field <org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink.hbaseConf>
has type <org.apache.hadoop.conf.Configuration> in
(HBaseDynamicTableSink.java:0)
@@ -92,4 +92,4 @@ Method
<org.apache.flink.connector.jdbc.internal.converter.OracleRowConverter.la
Method
<org.apache.flink.connector.jdbc.internal.converter.PostgresRowConverter.lambda$createPostgresArrayConverter$4f4cdb95$1(java.lang.Class,
org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter$JdbcDeserializationConverter,
java.lang.Object)> calls method <org.postgresql.jdbc.PgArray.getArray()> in
(PostgresRowConverter.java:90)
Method
<org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(org.apache.flink.streaming.api.scala.DataStream)>
calls method <org.apache.flink.streaming.api.scala.DataStream.javaStream()> in
(CassandraSink.java:205)
Method
<org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(org.apache.flink.streaming.api.scala.DataStream)>
has generic parameter type
<org.apache.flink.streaming.api.scala.DataStream<IN>> with type argument
depending on <org.apache.flink.streaming.api.scala.DataStream> in
(CassandraSink.java:0)
-Method
<org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(org.apache.flink.streaming.api.scala.DataStream)>
has parameter of type <org.apache.flink.streaming.api.scala.DataStream> in
(CassandraSink.java:0)
\ No newline at end of file
+Method
<org.apache.flink.streaming.connectors.cassandra.CassandraSink.addSink(org.apache.flink.streaming.api.scala.DataStream)>
has parameter of type <org.apache.flink.streaming.api.scala.DataStream> in
(CassandraSink.java:0)
diff --git
a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
index 6a3e6ba9379..fbc793ca3ca 100644
---
a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
+++
b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
@@ -27,12 +27,16 @@ import
org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink;
import org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
+import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
import org.apache.hadoop.conf.Configuration;
+import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
@@ -52,7 +56,6 @@ import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKE
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.PROPERTIES_PREFIX;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseConfiguration;
-import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseLookupOptions;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseWriteOptions;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.validatePrimaryKey;
import static
org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
@@ -78,13 +81,31 @@ public class HBase1DynamicTableFactory
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
+ LookupCache cache = null;
+
+ // Backward compatible to legacy caching options
+ if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0
+ && tableOptions.get(LOOKUP_CACHE_TTL).compareTo(Duration.ZERO)
> 0) {
+ cache =
+ DefaultLookupCache.newBuilder()
+
.maximumSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS))
+
.expireAfterWrite(tableOptions.get(LOOKUP_CACHE_TTL))
+ .build();
+ }
+
+ if (tableOptions
+ .get(LookupOptions.CACHE_TYPE)
+ .equals(LookupOptions.LookupCacheType.PARTIAL)) {
+ cache = DefaultLookupCache.fromConfig(tableOptions);
+ }
return new HBaseDynamicTableSource(
hbaseClientConf,
tableName,
hbaseSchema,
nullStringLiteral,
- getHBaseLookupOptions(tableOptions));
+ tableOptions.get(LookupOptions.MAX_RETRIES),
+ cache);
}
@Override
@@ -133,6 +154,12 @@ public class HBase1DynamicTableFactory
set.add(LOOKUP_CACHE_MAX_ROWS);
set.add(LOOKUP_CACHE_TTL);
set.add(LOOKUP_MAX_RETRIES);
+ set.add(LookupOptions.CACHE_TYPE);
+ set.add(LookupOptions.MAX_RETRIES);
+ set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS);
+ set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE);
+ set.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
+ set.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
return set;
}
diff --git
a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseDynamicTableSource.java
b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseDynamicTableSource.java
index 5375fd31ee1..0322c947ff3 100644
---
a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseDynamicTableSource.java
+++
b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/source/HBaseDynamicTableSource.java
@@ -19,16 +19,19 @@
package org.apache.flink.connector.hbase1.source;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.source.AbstractHBaseDynamicTableSource;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.hadoop.conf.Configuration;
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
/** HBase table source implementation. */
@Internal
public class HBaseDynamicTableSource extends AbstractHBaseDynamicTableSource {
@@ -38,14 +41,15 @@ public class HBaseDynamicTableSource extends
AbstractHBaseDynamicTableSource {
String tableName,
HBaseTableSchema hbaseSchema,
String nullStringLiteral,
- HBaseLookupOptions lookupOptions) {
- super(conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
+ int maxRetryTimes,
+ @Nullable LookupCache cache) {
+ super(conf, tableName, hbaseSchema, nullStringLiteral, maxRetryTimes,
cache);
}
@Override
public DynamicTableSource copy() {
return new HBaseDynamicTableSource(
- conf, tableName, hbaseSchema, nullStringLiteral,
lookupOptions);
+ conf, tableName, hbaseSchema, nullStringLiteral,
maxRetryTimes, cache);
}
@Override
@@ -53,8 +57,22 @@ public class HBaseDynamicTableSource extends
AbstractHBaseDynamicTableSource {
return new HBaseRowDataInputFormat(conf, tableName, hbaseSchema,
nullStringLiteral);
}
- @VisibleForTesting
- public HBaseLookupOptions getLookupOptions() {
- return this.lookupOptions;
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof HBaseDynamicTableSource)) {
+ return false;
+ }
+ HBaseDynamicTableSource that = (HBaseDynamicTableSource) o;
+ return Objects.equals(conf, that.conf)
+ && Objects.equals(tableName, that.tableName)
+ && Objects.equals(hbaseSchema, that.hbaseSchema)
+ && Objects.equals(nullStringLiteral, that.nullStringLiteral)
+ && Objects.equals(maxRetryTimes, that.maxRetryTimes)
+ && Objects.equals(cache, that.cache);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(conf, tableName, hbaseSchema, nullStringLiteral,
maxRetryTimes, cache);
}
}
diff --git
a/flink-connectors/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
b/flink-connectors/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
index b4e30b0ef9f..8a8f1d7068a 100644
---
a/flink-connectors/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
+++
b/flink-connectors/flink-connector-hbase-1.4/src/test/java/org/apache/flink/connector/hbase1/HBaseDynamicTableFactoryTest.java
@@ -19,10 +19,8 @@
package org.apache.flink.connector.hbase1;
import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction;
-import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase1.sink.HBaseDynamicTableSink;
import org.apache.flink.connector.hbase1.source.HBaseDynamicTableSource;
@@ -32,8 +30,9 @@ import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
-import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
+import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
+import org.apache.flink.table.functions.LookupFunction;
import
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import
org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext;
import org.apache.flink.table.types.DataType;
@@ -45,10 +44,12 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import static
org.apache.flink.connector.hbase.util.HBaseConfigurationUtil.getHBaseConfiguration;
import static org.apache.flink.table.api.DataTypes.BIGINT;
import static org.apache.flink.table.api.DataTypes.BOOLEAN;
import static org.apache.flink.table.api.DataTypes.DATE;
@@ -62,6 +63,7 @@ import static org.apache.flink.table.api.DataTypes.TIME;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -111,10 +113,10 @@ public class HBaseDynamicTableFactoryTest {
int[][] lookupKey = {{2}};
LookupTableSource.LookupRuntimeProvider lookupProvider =
hbaseSource.getLookupRuntimeProvider(new
LookupRuntimeProviderContext(lookupKey));
- assertTrue(lookupProvider instanceof TableFunctionProvider);
+ assertTrue(lookupProvider instanceof LookupFunctionProvider);
- TableFunction tableFunction =
- ((TableFunctionProvider) lookupProvider).createTableFunction();
+ LookupFunction tableFunction =
+ ((LookupFunctionProvider)
lookupProvider).createLookupFunction();
assertTrue(tableFunction instanceof HBaseRowDataLookupFunction);
assertEquals(
"testHBastTable", ((HBaseRowDataLookupFunction)
tableFunction).getHTableName());
@@ -141,6 +143,32 @@ public class HBaseDynamicTableFactoryTest {
hbaseSchema.getQualifierDataTypes("f4"));
}
+ @Test
+ public void testLookupOptions() {
+ ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY,
STRING()));
+ Map<String, String> options = getAllOptions();
+ options.put("lookup.cache", "PARTIAL");
+ options.put("lookup.partial-cache.expire-after-access", "15213s");
+ options.put("lookup.partial-cache.expire-after-write", "18213s");
+ options.put("lookup.partial-cache.max-rows", "10000");
+ options.put("lookup.partial-cache.cache-missing-key", "false");
+ options.put("lookup.max-retries", "15513");
+
+ DynamicTableSource source = createTableSource(schema, options);
+ HBaseDynamicTableSource hbaseSource = (HBaseDynamicTableSource) source;
+ assertThat(((HBaseDynamicTableSource)
source).getMaxRetryTimes()).isEqualTo(15513);
+
assertThat(hbaseSource.getCache()).isInstanceOf(DefaultLookupCache.class);
+ DefaultLookupCache cache = (DefaultLookupCache) hbaseSource.getCache();
+ assertThat(cache)
+ .isEqualTo(
+ DefaultLookupCache.newBuilder()
+ .expireAfterAccess(Duration.ofSeconds(15213))
+ .expireAfterWrite(Duration.ofSeconds(18213))
+ .maximumSize(10000)
+ .cacheMissingKey(false)
+ .build());
+ }
+
@Test
public void testTableSinkFactory() {
ResolvedSchema schema =
@@ -184,8 +212,7 @@ public class HBaseDynamicTableFactoryTest {
hbaseSchema.getQualifierDataTypes("f4"));
// verify hadoop Configuration
- org.apache.hadoop.conf.Configuration expectedConfiguration =
- HBaseConfigurationUtil.getHBaseConfiguration();
+ org.apache.hadoop.conf.Configuration expectedConfiguration =
getHBaseConfiguration();
expectedConfiguration.set(HConstants.ZOOKEEPER_QUORUM,
"localhost:2181");
expectedConfiguration.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/flink");
expectedConfiguration.set("hbase.security.authentication", "kerberos");
@@ -245,27 +272,6 @@ public class HBaseDynamicTableFactoryTest {
assertEquals(2, (long) provider.getParallelism().get());
}
- @Test
- public void testLookupOptions() {
- Map<String, String> options = getAllOptions();
- options.put("lookup.cache.max-rows", "1000");
- options.put("lookup.cache.ttl", "10s");
- options.put("lookup.max-retries", "10");
- ResolvedSchema schema =
- ResolvedSchema.of(
- Column.physical(ROWKEY, STRING()),
- Column.physical(FAMILY1, ROW(FIELD(COL1, DOUBLE()),
FIELD(COL2, INT()))));
- DynamicTableSource source = createTableSource(schema, options);
- HBaseLookupOptions actual = ((HBaseDynamicTableSource)
source).getLookupOptions();
- HBaseLookupOptions expected =
- HBaseLookupOptions.builder()
- .setCacheMaxSize(1000)
- .setCacheExpireMs(10_000)
- .setMaxRetryTimes(10)
- .build();
- assertEquals(expected, actual);
- }
-
@Test
public void testDisabledBufferFlushOptions() {
Map<String, String> options = getAllOptions();
diff --git
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
index 9b9a5258823..8c5513710e3 100644
---
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
+++
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
@@ -21,19 +21,22 @@ package org.apache.flink.connector.hbase2;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase2.sink.HBaseDynamicTableSink;
import org.apache.flink.connector.hbase2.source.HBaseDynamicTableSource;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
+import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
import org.apache.hadoop.conf.Configuration;
+import java.time.Duration;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
@@ -53,7 +56,6 @@ import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKE
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.ZOOKEEPER_ZNODE_PARENT;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.PROPERTIES_PREFIX;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseConfiguration;
-import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseLookupOptions;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.getHBaseWriteOptions;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptionsUtil.validatePrimaryKey;
import static
org.apache.flink.table.factories.FactoryUtil.createTableFactoryHelper;
@@ -76,13 +78,36 @@ public class HBase2DynamicTableFactory
String tableName = tableOptions.get(TABLE_NAME);
Configuration hbaseConf = getHBaseConfiguration(tableOptions);
- HBaseLookupOptions lookupOptions = getHBaseLookupOptions(tableOptions);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
+ LookupCache cache = null;
+
+ // Backward compatible to legacy caching options
+ if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0
+ && tableOptions.get(LOOKUP_CACHE_TTL).compareTo(Duration.ZERO)
> 0) {
+ cache =
+ DefaultLookupCache.newBuilder()
+
.maximumSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS))
+
.expireAfterWrite(tableOptions.get(LOOKUP_CACHE_TTL))
+ .build();
+ }
+
+ if (tableOptions
+ .get(LookupOptions.CACHE_TYPE)
+ .equals(LookupOptions.LookupCacheType.PARTIAL)) {
+ cache = DefaultLookupCache.fromConfig(tableOptions);
+ }
+
return new HBaseDynamicTableSource(
- hbaseConf, tableName, hbaseSchema, nullStringLiteral,
lookupOptions);
+ hbaseConf,
+ tableName,
+ hbaseSchema,
+ nullStringLiteral,
+ tableOptions.get(LookupOptions.MAX_RETRIES),
+ tableOptions.get(LOOKUP_ASYNC),
+ cache);
}
@Override
@@ -131,6 +156,12 @@ public class HBase2DynamicTableFactory
set.add(LOOKUP_CACHE_MAX_ROWS);
set.add(LOOKUP_CACHE_TTL);
set.add(LOOKUP_MAX_RETRIES);
+ set.add(LookupOptions.CACHE_TYPE);
+ set.add(LookupOptions.MAX_RETRIES);
+ set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS);
+ set.add(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE);
+ set.add(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY);
+ set.add(LookupOptions.PARTIAL_CACHE_MAX_ROWS);
return set;
}
diff --git
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
index 806455c3b5d..7c88703ca96 100644
---
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
+++
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
@@ -19,33 +19,43 @@
package org.apache.flink.connector.hbase2.source;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.source.AbstractHBaseDynamicTableSource;
import org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
-import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
+import
org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
+import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
+import
org.apache.flink.table.connector.source.lookup.PartialCachingAsyncLookupProvider;
+import
org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.hadoop.conf.Configuration;
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
import static org.apache.flink.util.Preconditions.checkArgument;
/** HBase table source implementation. */
@Internal
public class HBaseDynamicTableSource extends AbstractHBaseDynamicTableSource {
+ private final boolean lookupAsync;
+
public HBaseDynamicTableSource(
Configuration conf,
String tableName,
HBaseTableSchema hbaseSchema,
String nullStringLiteral,
- HBaseLookupOptions lookupOptions) {
- super(conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
+ int maxRetryTimes,
+ boolean lookupAsync,
+ @Nullable LookupCache cache) {
+ super(conf, tableName, hbaseSchema, nullStringLiteral, maxRetryTimes,
cache);
+ this.lookupAsync = lookupAsync;
}
@Override
@@ -61,21 +71,31 @@ public class HBaseDynamicTableSource extends
AbstractHBaseDynamicTableSource {
.get(context.getKeys()[0][0])
.equals(hbaseSchema.getRowKeyName().get()),
"Currently, HBase table only supports lookup by rowkey
field.");
- if (lookupOptions.getLookupAsync()) {
- return AsyncTableFunctionProvider.of(
+ if (lookupAsync) {
+ HBaseRowDataAsyncLookupFunction asyncLookupFunction =
new HBaseRowDataAsyncLookupFunction(
- conf, tableName, hbaseSchema, nullStringLiteral,
lookupOptions));
+ conf, tableName, hbaseSchema, nullStringLiteral,
maxRetryTimes);
+ if (cache != null) {
+ return
PartialCachingAsyncLookupProvider.of(asyncLookupFunction, cache);
+ } else {
+ return AsyncLookupFunctionProvider.of(asyncLookupFunction);
+ }
} else {
- return TableFunctionProvider.of(
+ HBaseRowDataLookupFunction lookupFunction =
new HBaseRowDataLookupFunction(
- conf, tableName, hbaseSchema, nullStringLiteral,
lookupOptions));
+ conf, tableName, hbaseSchema, nullStringLiteral,
maxRetryTimes);
+ if (cache != null) {
+ return PartialCachingLookupProvider.of(lookupFunction, cache);
+ } else {
+ return LookupFunctionProvider.of(lookupFunction);
+ }
}
}
@Override
public DynamicTableSource copy() {
return new HBaseDynamicTableSource(
- conf, tableName, hbaseSchema, nullStringLiteral,
lookupOptions);
+ conf, tableName, hbaseSchema, nullStringLiteral,
maxRetryTimes, lookupAsync, cache);
}
@Override
@@ -83,8 +103,24 @@ public class HBaseDynamicTableSource extends
AbstractHBaseDynamicTableSource {
return new HBaseRowDataInputFormat(conf, tableName, hbaseSchema,
nullStringLiteral);
}
- @VisibleForTesting
- public HBaseLookupOptions getLookupOptions() {
- return this.lookupOptions;
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof HBaseDynamicTableSource)) {
+ return false;
+ }
+ HBaseDynamicTableSource that = (HBaseDynamicTableSource) o;
+ return Objects.equals(conf, that.conf)
+ && Objects.equals(tableName, that.tableName)
+ && Objects.equals(hbaseSchema, that.hbaseSchema)
+ && Objects.equals(nullStringLiteral, that.nullStringLiteral)
+ && Objects.equals(maxRetryTimes, that.maxRetryTimes)
+ && Objects.equals(cache, that.cache)
+ && Objects.equals(lookupAsync, that.lookupAsync);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ conf, tableName, hbaseSchema, nullStringLiteral,
maxRetryTimes, cache, lookupAsync);
}
}
diff --git
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
index a41c7a595ba..697d5fd9eab 100644
---
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
+++
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
@@ -20,21 +20,16 @@ package org.apache.flink.connector.hbase2.source;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseSerde;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
-import org.apache.flink.metrics.Gauge;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
-import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@@ -56,14 +51,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
/**
* The HBaseRowDataAsyncLookupFunction is an implementation to lookup HBase
data by rowkey in async
* fashion. It looks up the result as {@link RowData}.
*/
@Internal
-public class HBaseRowDataAsyncLookupFunction extends
AsyncTableFunction<RowData> {
+public class HBaseRowDataAsyncLookupFunction extends AsyncLookupFunction {
private static final Logger LOG =
LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
@@ -78,10 +72,7 @@ public class HBaseRowDataAsyncLookupFunction extends
AsyncTableFunction<RowData>
private transient AsyncTable<ScanResultConsumer> table;
private transient HBaseSerde serde;
- private final long cacheMaxSize;
- private final long cacheExpireMs;
private final int maxRetryTimes;
- private transient Cache<Object, RowData> cache;
/** The size for thread pool. */
private static final int THREAD_POOL_SIZE = 16;
@@ -91,14 +82,12 @@ public class HBaseRowDataAsyncLookupFunction extends
AsyncTableFunction<RowData>
String hTableName,
HBaseTableSchema hbaseTableSchema,
String nullStringLiteral,
- HBaseLookupOptions lookupOptions) {
+ int maxRetryTimes) {
this.serializedConfig =
HBaseConfigurationUtil.serializeConfiguration(configuration);
this.hTableName = hTableName;
this.hbaseTableSchema = hbaseTableSchema;
this.nullStringLiteral = nullStringLiteral;
- this.cacheMaxSize = lookupOptions.getCacheMaxSize();
- this.cacheExpireMs = lookupOptions.getCacheExpireMs();
- this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+ this.maxRetryTimes = maxRetryTimes;
}
@Override
@@ -115,19 +104,6 @@ public class HBaseRowDataAsyncLookupFunction extends
AsyncTableFunction<RowData>
try {
asyncConnection = asyncConnectionFuture.get();
table = asyncConnection.getTable(TableName.valueOf(hTableName),
threadPool);
-
- this.cache =
- cacheMaxSize <= 0 || cacheExpireMs <= 0
- ? null
- : CacheBuilder.newBuilder()
- .recordStats()
- .expireAfterWrite(cacheExpireMs,
TimeUnit.MILLISECONDS)
- .maximumSize(cacheMaxSize)
- .build();
- if (cache != null && context != null) {
- context.getMetricGroup()
- .gauge("lookupCacheHitRate", (Gauge<Double>) () ->
cache.stats().hitRate());
- }
} catch (InterruptedException | ExecutionException e) {
LOG.error("Exception while creating connection to HBase.", e);
throw new RuntimeException("Cannot create connection to HBase.",
e);
@@ -139,24 +115,15 @@ public class HBaseRowDataAsyncLookupFunction extends
AsyncTableFunction<RowData>
/**
* The invoke entry point of lookup function.
*
- * @param future The result or exception is returned.
- * @param rowKey the lookup key. Currently only support single rowkey.
+ * @param keyRow A {@link RowData} that wraps lookup keys. Currently only
support single rowkey.
*/
- public void eval(CompletableFuture<Collection<RowData>> future, Object
rowKey) {
+ @Override
+ public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
int currentRetry = 0;
- if (cache != null) {
- RowData cacheRowData = cache.getIfPresent(rowKey);
- if (cacheRowData != null) {
- if (cacheRowData.getArity() == 0) {
- future.complete(Collections.emptyList());
- } else {
- future.complete(Collections.singletonList(cacheRowData));
- }
- return;
- }
- }
+ CompletableFuture<Collection<RowData>> future = new
CompletableFuture<>();
// fetch result
- fetchResult(future, currentRetry, rowKey);
+ fetchResult(future, currentRetry, ((GenericRowData)
keyRow).getField(0));
+ return future;
}
/**
@@ -199,18 +166,9 @@ public class HBaseRowDataAsyncLookupFunction extends
AsyncTableFunction<RowData>
} else {
if (result.isEmpty()) {
resultFuture.complete(Collections.emptyList());
- if (cache != null) {
- cache.put(rowKey, new GenericRowData(0));
- }
} else {
- if (cache != null) {
- RowData rowData =
serde.convertToNewRow(result);
-
resultFuture.complete(Collections.singletonList(rowData));
- cache.put(rowKey, rowData);
- } else {
- resultFuture.complete(
-
Collections.singletonList(serde.convertToNewRow(result)));
- }
+ resultFuture.complete(
+
Collections.singletonList(serde.convertToNewRow(result)));
}
}
});
diff --git
a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
index 7d0f900ab3c..a535f0e0c58 100644
---
a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
+++
b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseDynamicTableFactoryTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.connector.hbase2;
import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.source.HBaseRowDataLookupFunction;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
@@ -31,12 +30,13 @@ import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
-import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
-import org.apache.flink.table.functions.AsyncTableFunction;
-import org.apache.flink.table.functions.TableFunction;
+import
org.apache.flink.table.connector.source.lookup.AsyncLookupFunctionProvider;
+import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
+import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
+import org.apache.flink.table.functions.AsyncLookupFunction;
+import org.apache.flink.table.functions.LookupFunction;
import
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import
org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext;
import org.apache.flink.table.types.DataType;
@@ -48,6 +48,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -65,6 +66,7 @@ import static org.apache.flink.table.api.DataTypes.TIME;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
import static
org.apache.flink.table.factories.utils.FactoryMocks.createTableSource;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -114,10 +116,10 @@ public class HBaseDynamicTableFactoryTest {
int[][] lookupKey = {{2}};
LookupTableSource.LookupRuntimeProvider lookupProvider =
hbaseSource.getLookupRuntimeProvider(new
LookupRuntimeProviderContext(lookupKey));
- assertTrue(lookupProvider instanceof TableFunctionProvider);
+ assertTrue(lookupProvider instanceof LookupFunctionProvider);
- TableFunction tableFunction =
- ((TableFunctionProvider) lookupProvider).createTableFunction();
+ LookupFunction tableFunction =
+ ((LookupFunctionProvider)
lookupProvider).createLookupFunction();
assertTrue(tableFunction instanceof HBaseRowDataLookupFunction);
assertEquals(
"testHBastTable", ((HBaseRowDataLookupFunction)
tableFunction).getHTableName());
@@ -144,6 +146,32 @@ public class HBaseDynamicTableFactoryTest {
hbaseSchema.getQualifierDataTypes("f4"));
}
+ @Test
+ public void testLookupOptions() {
+ ResolvedSchema schema = ResolvedSchema.of(Column.physical(ROWKEY,
STRING()));
+ Map<String, String> options = getAllOptions();
+ options.put("lookup.cache", "PARTIAL");
+ options.put("lookup.partial-cache.expire-after-access", "15213s");
+ options.put("lookup.partial-cache.expire-after-write", "18213s");
+ options.put("lookup.partial-cache.max-rows", "10000");
+ options.put("lookup.partial-cache.cache-missing-key", "false");
+ options.put("lookup.max-retries", "15513");
+
+ DynamicTableSource source = createTableSource(schema, options);
+ HBaseDynamicTableSource hbaseSource = (HBaseDynamicTableSource) source;
+ assertThat(((HBaseDynamicTableSource)
source).getMaxRetryTimes()).isEqualTo(15513);
+
assertThat(hbaseSource.getCache()).isInstanceOf(DefaultLookupCache.class);
+ DefaultLookupCache cache = (DefaultLookupCache) hbaseSource.getCache();
+ assertThat(cache)
+ .isEqualTo(
+ DefaultLookupCache.newBuilder()
+ .expireAfterAccess(Duration.ofSeconds(15213))
+ .expireAfterWrite(Duration.ofSeconds(18213))
+ .maximumSize(10000)
+ .cacheMissingKey(false)
+ .build());
+ }
+
@Test
public void testTableSinkFactory() {
ResolvedSchema schema =
@@ -248,27 +276,6 @@ public class HBaseDynamicTableFactoryTest {
assertEquals(2, (long) provider.getParallelism().get());
}
- @Test
- public void testLookupOptions() {
- Map<String, String> options = getAllOptions();
- options.put("lookup.cache.max-rows", "1000");
- options.put("lookup.cache.ttl", "10s");
- options.put("lookup.max-retries", "10");
- ResolvedSchema schema =
- ResolvedSchema.of(
- Column.physical(ROWKEY, STRING()),
- Column.physical(FAMILY1, ROW(FIELD(COL1, DOUBLE()),
FIELD(COL2, INT()))));
- DynamicTableSource source = createTableSource(schema, options);
- HBaseLookupOptions actual = ((HBaseDynamicTableSource)
source).getLookupOptions();
- HBaseLookupOptions expected =
- HBaseLookupOptions.builder()
- .setCacheMaxSize(1000)
- .setCacheExpireMs(10_000)
- .setMaxRetryTimes(10)
- .build();
- assertEquals(expected, actual);
- }
-
@Test
public void testLookupAsync() {
Map<String, String> options = getAllOptions();
@@ -284,10 +291,10 @@ public class HBaseDynamicTableFactoryTest {
int[][] lookupKey = {{0}};
LookupTableSource.LookupRuntimeProvider lookupProvider =
hbaseSource.getLookupRuntimeProvider(new
LookupRuntimeProviderContext(lookupKey));
- assertTrue(lookupProvider instanceof AsyncTableFunctionProvider);
+ assertTrue(lookupProvider instanceof AsyncLookupFunctionProvider);
- AsyncTableFunction asyncTableFunction =
- ((AsyncTableFunctionProvider)
lookupProvider).createAsyncTableFunction();
+ AsyncLookupFunction asyncTableFunction =
+ ((AsyncLookupFunctionProvider)
lookupProvider).createAsyncLookupFunction();
assertTrue(asyncTableFunction instanceof
HBaseRowDataAsyncLookupFunction);
assertEquals(
"testHBastTable",
diff --git
a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
index 2f8cba68804..312041f220f 100644
---
a/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
+++
b/flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
@@ -18,18 +18,16 @@
package org.apache.flink.connector.hbase2.source;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.connector.hbase2.util.HBaseTestBase;
import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import java.util.ArrayList;
import java.util.Collection;
@@ -48,15 +46,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
-@RunWith(Parameterized.class)
public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {
- @Parameterized.Parameter public boolean useCache;
-
- @Parameterized.Parameters(name = "use cache = {0}")
- public static Object[] parameters() {
- return new Object[][] {new Object[] {true}, new Object[] {false}};
- }
-
@Test
public void testEval() throws Exception {
HBaseRowDataAsyncLookupFunction lookupFunction =
buildRowDataAsyncLookupFunction();
@@ -99,11 +89,6 @@ public class HBaseRowDataAsyncLookupFunctionTest extends
HBaseTestBase {
}
private HBaseRowDataAsyncLookupFunction buildRowDataAsyncLookupFunction() {
- HBaseLookupOptions lookupOptions =
HBaseLookupOptions.builder().build();
- if (useCache) {
- lookupOptions =
-
HBaseLookupOptions.builder().setCacheMaxSize(4).setCacheExpireMs(10000).build();
- }
DataType dataType =
ROW(
FIELD(ROW_KEY, INT()),
@@ -117,6 +102,10 @@ public class HBaseRowDataAsyncLookupFunctionTest extends
HBaseTestBase {
FIELD(F3COL3, STRING()))));
HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(dataType);
return new HBaseRowDataAsyncLookupFunction(
- getConf(), TEST_TABLE_1, hbaseSchema, "null", lookupOptions);
+ getConf(),
+ TEST_TABLE_1,
+ hbaseSchema,
+ "null",
+ LookupOptions.MAX_RETRIES.defaultValue());
}
}
diff --git
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseLookupOptions.java
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseLookupOptions.java
deleted file mode 100644
index 192eb077097..00000000000
---
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseLookupOptions.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.hbase.options;
-
-import org.apache.flink.annotation.Internal;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/** Options for the HBase lookup. */
-@Internal
-public class HBaseLookupOptions implements Serializable {
- private static final long serialVersionUID = 1L;
- private static final int DEFAULT_MAX_RETRY_TIMES = 3;
-
- private final long cacheMaxSize;
- private final long cacheExpireMs;
- private final int maxRetryTimes;
- private final boolean lookupAsync;
-
- public HBaseLookupOptions(
- long cacheMaxSize, long cacheExpireMs, int maxRetryTimes, boolean
lookupAsync) {
- this.cacheMaxSize = cacheMaxSize;
- this.cacheExpireMs = cacheExpireMs;
- this.maxRetryTimes = maxRetryTimes;
- this.lookupAsync = lookupAsync;
- }
-
- public long getCacheMaxSize() {
- return cacheMaxSize;
- }
-
- public long getCacheExpireMs() {
- return cacheExpireMs;
- }
-
- public int getMaxRetryTimes() {
- return maxRetryTimes;
- }
-
- public boolean getLookupAsync() {
- return lookupAsync;
- }
-
- public static Builder builder() {
- return new Builder();
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof HBaseLookupOptions) {
- HBaseLookupOptions options = (HBaseLookupOptions) o;
- return Objects.equals(cacheMaxSize, options.cacheMaxSize)
- && Objects.equals(cacheExpireMs, options.cacheExpireMs)
- && Objects.equals(maxRetryTimes, options.maxRetryTimes)
- && Objects.equals(lookupAsync, options.lookupAsync);
- } else {
- return false;
- }
- }
-
- /** Builder of {@link HBaseLookupOptions}. */
- public static class Builder {
- private long cacheMaxSize = -1L;
- private long cacheExpireMs = 0L;
- private int maxRetryTimes = DEFAULT_MAX_RETRY_TIMES;
- private boolean lookupAsync = false;
-
- /** optional, lookup cache max size, over this value, the old data
will be eliminated. */
- public Builder setCacheMaxSize(long cacheMaxSize) {
- this.cacheMaxSize = cacheMaxSize;
- return this;
- }
-
- /** optional, lookup cache expire mills, over this time, the old data
will expire. */
- public Builder setCacheExpireMs(long cacheExpireMs) {
- this.cacheExpireMs = cacheExpireMs;
- return this;
- }
-
- /** optional, max retry times for Hbase connector. */
- public Builder setMaxRetryTimes(int maxRetryTimes) {
- this.maxRetryTimes = maxRetryTimes;
- return this;
- }
-
- /** optional, whether to set async lookup. */
- public Builder setLookupAsync(boolean lookupAsync) {
- this.lookupAsync = lookupAsync;
- return this;
- }
-
- public HBaseLookupOptions build() {
- return new HBaseLookupOptions(cacheMaxSize, cacheExpireMs,
maxRetryTimes, lookupAsync);
- }
- }
-}
diff --git
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
index bf1aa01fa04..c530cde003a 100644
---
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
+++
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
@@ -21,20 +21,23 @@ package org.apache.flink.connector.hbase.source;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
-import org.apache.flink.table.connector.source.TableFunctionProvider;
import
org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
+import
org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.hadoop.conf.Configuration;
+import javax.annotation.Nullable;
+
import static org.apache.flink.util.Preconditions.checkArgument;
/** HBase table source implementation. */
@@ -46,19 +49,22 @@ public abstract class AbstractHBaseDynamicTableSource
protected final String tableName;
protected HBaseTableSchema hbaseSchema;
protected final String nullStringLiteral;
- protected final HBaseLookupOptions lookupOptions;
+ protected final int maxRetryTimes;
+ @Nullable protected final LookupCache cache;
public AbstractHBaseDynamicTableSource(
Configuration conf,
String tableName,
HBaseTableSchema hbaseSchema,
String nullStringLiteral,
- HBaseLookupOptions lookupOptions) {
+ int maxRetryTimes,
+ @Nullable LookupCache cache) {
this.conf = conf;
this.tableName = tableName;
this.hbaseSchema = hbaseSchema;
this.nullStringLiteral = nullStringLiteral;
- this.lookupOptions = lookupOptions;
+ this.maxRetryTimes = maxRetryTimes;
+ this.cache = cache;
}
@Override
@@ -81,10 +87,14 @@ public abstract class AbstractHBaseDynamicTableSource
.get(context.getKeys()[0][0])
.equals(hbaseSchema.getRowKeyName().get()),
"Currently, HBase table only supports lookup by rowkey
field.");
-
- return TableFunctionProvider.of(
+ HBaseRowDataLookupFunction lookupFunction =
new HBaseRowDataLookupFunction(
- conf, tableName, hbaseSchema, nullStringLiteral,
lookupOptions));
+ conf, tableName, hbaseSchema, nullStringLiteral,
maxRetryTimes);
+ if (cache != null) {
+ return PartialCachingLookupProvider.of(lookupFunction, cache);
+ } else {
+ return LookupFunctionProvider.of(lookupFunction);
+ }
}
@Override
@@ -116,4 +126,15 @@ public abstract class AbstractHBaseDynamicTableSource
public HBaseTableSchema getHBaseTableSchema() {
return this.hbaseSchema;
}
+
+ @VisibleForTesting
+ public int getMaxRetryTimes() {
+ return maxRetryTimes;
+ }
+
+ @VisibleForTesting
+ @Nullable
+ public LookupCache getCache() {
+ return cache;
+ }
}
diff --git
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
index 7d7acbccee4..0e1ba542e5b 100644
---
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
+++
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/HBaseRowDataLookupFunction.java
@@ -20,19 +20,15 @@ package org.apache.flink.connector.hbase.source;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseSerde;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
-import org.apache.flink.metrics.Gauge;
+import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
-import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.util.StringUtils;
-import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@@ -46,7 +42,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.concurrent.TimeUnit;
+import java.util.Collection;
+import java.util.Collections;
/**
* The HBaseRowDataLookupFunction is a standard user-defined table function,
it can be used in
@@ -54,7 +51,7 @@ import java.util.concurrent.TimeUnit;
* RowData}.
*/
@Internal
-public class HBaseRowDataLookupFunction extends TableFunction<RowData> {
+public class HBaseRowDataLookupFunction extends LookupFunction {
private static final Logger LOG =
LoggerFactory.getLogger(HBaseRowDataLookupFunction.class);
private static final long serialVersionUID = 1L;
@@ -68,54 +65,39 @@ public class HBaseRowDataLookupFunction extends
TableFunction<RowData> {
private transient HTable table;
private transient HBaseSerde serde;
- private final long cacheMaxSize;
- private final long cacheExpireMs;
private final int maxRetryTimes;
- private transient Cache<Object, RowData> cache;
public HBaseRowDataLookupFunction(
Configuration configuration,
String hTableName,
HBaseTableSchema hbaseTableSchema,
String nullStringLiteral,
- HBaseLookupOptions lookupOptions) {
+ int maxRetryTimes) {
this.serializedConfig =
HBaseConfigurationUtil.serializeConfiguration(configuration);
this.hTableName = hTableName;
this.hbaseTableSchema = hbaseTableSchema;
this.nullStringLiteral = nullStringLiteral;
- this.cacheMaxSize = lookupOptions.getCacheMaxSize();
- this.cacheExpireMs = lookupOptions.getCacheExpireMs();
- this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+ this.maxRetryTimes = maxRetryTimes;
}
/**
* The invoke entry point of lookup function.
*
- * @param rowKey the lookup key. Currently only support single rowkey.
+ * @param keyRow - A {@link RowData} that wraps lookup keys. Currently
only support single
+ * rowkey.
*/
- public void eval(Object rowKey) throws IOException {
- if (cache != null) {
- RowData cacheRowData = cache.getIfPresent(rowKey);
- if (cacheRowData != null) {
- collect(cacheRowData);
- return;
- }
- }
+ @Override
+ public Collection<RowData> lookup(RowData keyRow) throws IOException {
for (int retry = 0; retry <= maxRetryTimes; retry++) {
try {
- // fetch result
- Get get = serde.createGet(rowKey);
+ // TODO: The implementation of LookupFunction will pass a
GenericRowData as key row
+ // and it's safe to cast for now. We need to update the logic
once we improve the
+ // LookupFunction in the future.
+ Get get = serde.createGet(((GenericRowData)
keyRow).getField(0));
if (get != null) {
Result result = table.get(get);
if (!result.isEmpty()) {
- if (cache != null) {
- // parse and collect
- RowData rowData = serde.convertToNewRow(result);
- collect(rowData);
- cache.put(rowKey, rowData);
- } else {
- collect(serde.convertToReusedRow(result));
- }
+ return
Collections.singletonList(serde.convertToReusedRow(result));
}
}
break;
@@ -131,6 +113,7 @@ public class HBaseRowDataLookupFunction extends
TableFunction<RowData> {
}
}
}
+ return Collections.emptyList();
}
private Configuration prepareRuntimeConfiguration() {
@@ -164,18 +147,6 @@ public class HBaseRowDataLookupFunction extends
TableFunction<RowData> {
try {
hConnection = ConnectionFactory.createConnection(config);
table = (HTable)
hConnection.getTable(TableName.valueOf(hTableName));
- this.cache =
- cacheMaxSize <= 0 || cacheExpireMs <= 0
- ? null
- : CacheBuilder.newBuilder()
- .recordStats()
- .expireAfterWrite(cacheExpireMs,
TimeUnit.MILLISECONDS)
- .maximumSize(cacheMaxSize)
- .build();
- if (cache != null) {
- context.getMetricGroup()
- .gauge("lookupCacheHitRate", (Gauge<Double>) () ->
cache.stats().hitRate());
- }
} catch (TableNotFoundException tnfe) {
LOG.error("Table '{}' not found ", hTableName, tnfe);
throw new RuntimeException("HBase table '" + hTableName + "' not
found.", tnfe);
diff --git
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
index 4ecc816c371..0c8dc978551 100644
---
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
+++
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptions.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.factories.FactoryUtil;
import java.time.Duration;
@@ -93,6 +94,8 @@ public class HBaseConnectorOptions {
.defaultValue(false)
.withDescription("whether to set async lookup.");
+ /** @deprecated Please use {@link LookupOptions#PARTIAL_CACHE_MAX_ROWS}
instead. */
+ @Deprecated
public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
ConfigOptions.key("lookup.cache.max-rows")
.longType()
@@ -102,12 +105,15 @@ public class HBaseConnectorOptions {
+ "be eliminated. \"cache.max-rows\" and
\"cache.ttl\" options must all be specified if any of them is "
+ "specified. Cache is not enabled as
default.");
+ /** @deprecated Please use {@link
LookupOptions#PARTIAL_CACHE_EXPIRE_AFTER_WRITE} instead. */
+ @Deprecated
public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
ConfigOptions.key("lookup.cache.ttl")
.durationType()
.defaultValue(Duration.ofSeconds(0))
.withDescription("the cache time to live.");
+ /** @deprecated Please used {@link LookupOptions#MAX_RETRIES} instead. */
public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
ConfigOptions.key("lookup.max-retries")
.intType()
diff --git
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
index 4585610821a..2141fe10523 100644
---
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
+++
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
@@ -20,7 +20,6 @@ package org.apache.flink.connector.hbase.table;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
import org.apache.flink.connector.hbase.options.HBaseWriteOptions;
import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
import org.apache.flink.connector.hbase.util.HBaseTableSchema;
@@ -32,7 +31,6 @@ import org.apache.hadoop.hbase.HConstants;
import java.util.Map;
import java.util.Properties;
-import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_INTERVAL;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_ROWS;
import static
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.SINK_BUFFER_FLUSH_MAX_SIZE;
@@ -95,16 +93,6 @@ public class HBaseConnectorOptionsUtil {
return builder.build();
}
- public static HBaseLookupOptions getHBaseLookupOptions(ReadableConfig
tableOptions) {
- HBaseLookupOptions.Builder builder = HBaseLookupOptions.builder();
-
builder.setLookupAsync(tableOptions.get(HBaseConnectorOptions.LOOKUP_ASYNC));
-
builder.setMaxRetryTimes(tableOptions.get(HBaseConnectorOptions.LOOKUP_MAX_RETRIES));
- builder.setCacheExpireMs(
-
tableOptions.get(HBaseConnectorOptions.LOOKUP_CACHE_TTL).toMillis());
- builder.setCacheMaxSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS));
- return builder.build();
- }
-
/** config HBase Configuration. */
public static Configuration getHBaseConfiguration(ReadableConfig
tableOptions) {
// create default configuration from current runtime env
(`hbase-site.xml` in classpath)
diff --git
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
index d43828ab9bd..f33ac4e19c2 100644
---
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
+++
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseTableSchema.java
@@ -36,6 +36,7 @@ import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import static
org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType;
@@ -357,5 +358,37 @@ public class HBaseTableSchema implements Serializable {
this.rowKeyType = rowKeyType;
this.rowKeyIndex = rowKeyIndex;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof RowKeyInfo)) {
+ return false;
+ }
+ RowKeyInfo that = (RowKeyInfo) o;
+ return Objects.equals(rowKeyName, that.rowKeyName)
+ && Objects.equals(rowKeyType, that.rowKeyType)
+ && Objects.equals(rowKeyIndex, that.rowKeyIndex);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rowKeyName, rowKeyType, rowKeyIndex);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof HBaseTableSchema)) {
+ return false;
+ }
+ HBaseTableSchema that = (HBaseTableSchema) o;
+ return Objects.equals(familyMap, that.familyMap)
+ && Objects.equals(rowKeyInfo, that.rowKeyInfo)
+ && Objects.equals(charset, that.charset);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(familyMap, rowKeyInfo, charset);
}
}