leonardBang commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r570712863
##########
File path:
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
##########
@@ -35,17 +42,45 @@ public HBaseDynamicTableSource(
Configuration conf,
String tableName,
HBaseTableSchema hbaseSchema,
- String nullStringLiteral) {
- super(conf, tableName, hbaseSchema, nullStringLiteral);
+ String nullStringLiteral,
+ HBaseLookupOptions lookupOptions) {
+ super(conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
+ }
+
+ @Override
+ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext
context) {
+ checkArgument(context.getKeys().length == 1 &&
context.getKeys()[0].length == 1,
+ "Currently, HBase table can only be lookup by single rowkey.");
+ checkArgument(
+ hbaseSchema.getRowKeyName().isPresent(),
+ "HBase schema must have a row key when used in lookup mode.");
+ checkArgument(
+ hbaseSchema
+ .convertsToTableSchema()
+ .getTableColumn(context.getKeys()[0][0])
+ .filter(f ->
f.getName().equals(hbaseSchema.getRowKeyName().get()))
+ .isPresent(),
+ "Currently, HBase table only supports lookup by rowkey field.");
+ boolean isAsync = lookupOptions.getLookupAsync();
+ if (isAsync){
Review comment:
```suggestion
if (lookupOptions.getLookupAsync()){
```
##########
File path:
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table
function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up
the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends
AsyncTableFunction<RowData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
Review comment:
redundant blank lines
##########
File path:
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/options/HBaseOptions.java
##########
@@ -94,6 +94,33 @@
+ "Can be set to '0' to disable it. Note,
both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
+ "can be set to '0' with the flush
interval set allowing for complete async processing of buffered actions.");
+ public static final ConfigOption<Boolean> LOOKUP_ASYNC =
+ ConfigOptions.key("lookup.async")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("whether to set async lookup.");
+
+ public static final ConfigOption<Long> LOOKUP_CACHE_MAX_ROWS =
+ ConfigOptions.key("lookup.cache.max-rows")
+ .longType()
+ .defaultValue(-1L)
+ .withDescription(
+ "the max number of rows of lookup cache, over this value, the
oldest rows will "
+ + "be eliminated. \"cache.max-rows\" and \"cache.ttl\"
options must all be specified if any of them is "
+ + "specified. Cache is not enabled as default.");
+
+ public static final ConfigOption<Duration> LOOKUP_CACHE_TTL =
+ ConfigOptions.key("lookup.cache.ttl")
+ .durationType()
+ .defaultValue(Duration.ofSeconds(0))
+ .withDescription("the cache time to live.");
+
+ public static final ConfigOption<Integer> LOOKUP_MAX_RETRIES =
+ ConfigOptions.key("lookup.max-retries")
+ .intType()
+ .defaultValue(3)
+ .withDescription("the max retry times if lookup database failed.");
Review comment:
The three Options should not be related to `LOOKUP_ASYNC`, if we plan to
support this, we also need to support cache for `HBaseRowDataLookupFunction`,
otherwise this option is invalid if user did not enable `LOOKUP_ASYNC`
##########
File path:
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table
function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up
the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends
AsyncTableFunction<RowData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
+ private static final long serialVersionUID = 1L;
+
+ private final String hTableName;
+ private final byte[] serializedConfig;
+ private final HBaseTableSchema hbaseTableSchema;
+ private final String nullStringLiteral;
+
+ private transient AsyncConnection asyncConnection;
+ private transient AsyncTable<ScanResultConsumer> table;
+ private transient HBaseSerde serde;
+
+ private final long cacheMaxSize;
+ private final long cacheExpireMs;
+ private final int maxRetryTimes;
+ private transient Cache<Object, RowData> cache;
+
+ public HBaseRowDataAsyncLookupFunction(
+ Configuration configuration,
+ String hTableName,
+ HBaseTableSchema hbaseTableSchema,
+ String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+ this.serializedConfig =
HBaseConfigurationUtil.serializeConfiguration(configuration);
+ this.hTableName = hTableName;
+ this.hbaseTableSchema = hbaseTableSchema;
+ this.nullStringLiteral = nullStringLiteral;
+ this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+ this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+ this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+ }
+
+ /**
+ * The invoke entry point of lookup function.
+ * @param feature The result or exception is returned.
+ * @param rowKey the lookup key. Currently only support single rowkey.
+ */
+ public void eval(CompletableFuture<Collection<RowData>> feature, Object
rowKey) {
+ int currentRetry = 0;
+ if (cache != null){
+ RowData cacheRowData = cache.getIfPresent(rowKey);
+ if (cacheRowData != null){
+ if (cacheRowData.getArity() == 0){
+ feature.complete(Collections.emptyList());
+ } else {
+ feature.complete(Collections.singletonList(cacheRowData));
+ }
+ return;
+ }
+ }
+ // fetch result
+ fetchResult(feature, currentRetry, rowKey);
+ }
+
+ /**
+ * Execute async fetch result .
+ * @param feature The result or exception is returned.
+ * @param currentRetry Current number of retries.
+ * @param rowKey the lookup key.
+ */
+ private void fetchResult(CompletableFuture<Collection<RowData>> feature,
int currentRetry, Object rowKey){
+ Get get = serde.createGet(rowKey);
+ CompletableFuture<Result> resultFuture = table.get(get);
+ resultFuture.whenCompleteAsync(
+ (result, throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof TableNotFoundException) {
+ LOG.error("Table '{}' not found ", hTableName,
throwable);
+ feature.completeExceptionally(
+ new RuntimeException("HBase table '" + hTableName
+ "' not found.", throwable));
+ } else {
+ LOG.error(String.format("Hbase asyncLookup error,
retry times = %d", currentRetry), throwable);
+ if (currentRetry >= maxRetryTimes) {
+ feature.completeExceptionally(throwable);
+ } else {
+ try {
+ Thread.sleep(1000 * currentRetry);
+ } catch (InterruptedException e1) {
+ feature.completeExceptionally(e1);
+ }
+ fetchResult(feature, currentRetry + 1, rowKey);
+ }
+ }
+ } else {
+ boolean flag = result.isEmpty();
+ if (flag) {
+ feature.complete(Collections.emptyList());
+ if (cache != null) {
+ cache.put(rowKey, new GenericRowData(0));
+ }
+ } else {
+ GenericRowData rowData = (GenericRowData)
serde.convertToRow(result);
+ feature.complete(Collections.singletonList(rowData));
+ if (cache != null){
+ cache.put(rowKey, rowData);
+ }
+ }
+ }
+ });
+ }
+
+ private Configuration prepareRuntimeConfiguration() {
+ // create default configuration from current runtime env
(`hbase-site.xml` in classpath) first,
+ // and overwrite configuration using serialized configuration from
client-side env (`hbase-site.xml` in classpath).
+ // user params from client-side have the highest priority
+ Configuration runtimeConfig =
HBaseConfigurationUtil.deserializeConfiguration(
+ serializedConfig,
+ HBaseConfigurationUtil.getHBaseConfiguration());
+
+ // do validation: check key option(s) in final runtime configuration
+ if
(StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM)))
{
+ LOG.error("can not connect to HBase without {} configuration",
HConstants.ZOOKEEPER_QUORUM);
+ throw new IllegalArgumentException("check HBase configuration
failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+ }
+
+ return runtimeConfig;
+ }
+
+ @Override
+ public void open(FunctionContext context) {
+ LOG.info("start open ...");
+ Configuration config = prepareRuntimeConfiguration();
+ CompletableFuture<AsyncConnection> asyncConnectionFuture =
ConnectionFactory.createAsyncConnection(config);
+ try {
+ asyncConnection = asyncConnectionFuture.get();
+ table = asyncConnection.getTable(TableName.valueOf(hTableName),
(ExecutorService) Executors.directExecutor());
+
+ this.cache = cacheMaxSize == -1 || cacheExpireMs == 0 ? null :
CacheBuilder.newBuilder()
+ .recordStats()
+ .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+ .maximumSize(cacheMaxSize)
+ .build();
+ if (cache != null) {
+ context.getMetricGroup().gauge("lookupCacheHitRate",
(Gauge<Double>) () -> cache.stats().hitRate());
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Exception while creating connection to HBase.", e);
+ throw new RuntimeException("Cannot create connection to HBase.",
e);
+ }
+ this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
+ LOG.info("end open.");
+ }
+
+ @Override
+ public void close() {
+ LOG.info("start close ...");
+ if (null != asyncConnection) {
+ try {
+ asyncConnection.close();
+ asyncConnection = null;
+ } catch (IOException e) {
+ // ignore exception when close.
+ LOG.warn("exception when close connection", e);
+ }
+ }
+ LOG.info("end close.");
+ }
+
+ @VisibleForTesting
+ public String getHTableName() {
+ return hTableName;
+ }
+
+
+}
Review comment:
I think we should add unit test for this class just like
`JdbcRowDataLookupFunctionTest` and a ITCase in HBaseConnectorITCase as well
##########
File path: docs/dev/table/connectors/hbase.md
##########
@@ -172,6 +172,34 @@ Connector Options
<td>Integer</td>
<td>Defines the parallelism of the HBase sink operator. By default, the
parallelism is determined by the framework using the same parallelism of the
upstream chained operator.</td>
</tr>
+ <tr>
+ <td><h5>lookup.async</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether async lookup are supported.If true,the lookup will be
async.Note,async only supports hbase-2.2 connector.</td>
Review comment:
Please note the doc format like comma/point should follow a space, you
can reference the JDBC doc page
##########
File path:
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table
function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up
the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends
AsyncTableFunction<RowData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
+ private static final long serialVersionUID = 1L;
+
+ private final String hTableName;
+ private final byte[] serializedConfig;
+ private final HBaseTableSchema hbaseTableSchema;
+ private final String nullStringLiteral;
+
+ private transient AsyncConnection asyncConnection;
+ private transient AsyncTable<ScanResultConsumer> table;
+ private transient HBaseSerde serde;
+
+ private final long cacheMaxSize;
+ private final long cacheExpireMs;
+ private final int maxRetryTimes;
+ private transient Cache<Object, RowData> cache;
+
+ public HBaseRowDataAsyncLookupFunction(
+ Configuration configuration,
+ String hTableName,
+ HBaseTableSchema hbaseTableSchema,
+ String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+ this.serializedConfig =
HBaseConfigurationUtil.serializeConfiguration(configuration);
+ this.hTableName = hTableName;
+ this.hbaseTableSchema = hbaseTableSchema;
+ this.nullStringLiteral = nullStringLiteral;
+ this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+ this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+ this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+ }
+
+ /**
+ * The invoke entry point of lookup function.
+ * @param feature The result or exception is returned.
+ * @param rowKey the lookup key. Currently only support single rowkey.
+ */
+ public void eval(CompletableFuture<Collection<RowData>> feature, Object
rowKey) {
+ int currentRetry = 0;
+ if (cache != null){
+ RowData cacheRowData = cache.getIfPresent(rowKey);
+ if (cacheRowData != null){
+ if (cacheRowData.getArity() == 0){
+ feature.complete(Collections.emptyList());
+ } else {
+ feature.complete(Collections.singletonList(cacheRowData));
+ }
+ return;
+ }
+ }
+ // fetch result
+ fetchResult(feature, currentRetry, rowKey);
+ }
+
+ /**
+ * Execute async fetch result .
+ * @param feature The result or exception is returned.
+ * @param currentRetry Current number of retries.
+ * @param rowKey the lookup key.
+ */
+ private void fetchResult(CompletableFuture<Collection<RowData>> feature,
int currentRetry, Object rowKey){
+ Get get = serde.createGet(rowKey);
+ CompletableFuture<Result> resultFuture = table.get(get);
+ resultFuture.whenCompleteAsync(
+ (result, throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof TableNotFoundException) {
+ LOG.error("Table '{}' not found ", hTableName,
throwable);
+ feature.completeExceptionally(
+ new RuntimeException("HBase table '" + hTableName
+ "' not found.", throwable));
+ } else {
+ LOG.error(String.format("Hbase asyncLookup error,
retry times = %d", currentRetry), throwable);
+ if (currentRetry >= maxRetryTimes) {
+ feature.completeExceptionally(throwable);
+ } else {
+ try {
+ Thread.sleep(1000 * currentRetry);
+ } catch (InterruptedException e1) {
+ feature.completeExceptionally(e1);
+ }
+ fetchResult(feature, currentRetry + 1, rowKey);
+ }
+ }
+ } else {
+ boolean flag = result.isEmpty();
+ if (flag) {
+ feature.complete(Collections.emptyList());
+ if (cache != null) {
+ cache.put(rowKey, new GenericRowData(0));
+ }
+ } else {
+ GenericRowData rowData = (GenericRowData)
serde.convertToRow(result);
+ feature.complete(Collections.singletonList(rowData));
+ if (cache != null){
+ cache.put(rowKey, rowData);
+ }
+ }
+ }
+ });
+ }
+
+ private Configuration prepareRuntimeConfiguration() {
+ // create default configuration from current runtime env
(`hbase-site.xml` in classpath) first,
+ // and overwrite configuration using serialized configuration from
client-side env (`hbase-site.xml` in classpath).
+ // user params from client-side have the highest priority
+ Configuration runtimeConfig =
HBaseConfigurationUtil.deserializeConfiguration(
+ serializedConfig,
+ HBaseConfigurationUtil.getHBaseConfiguration());
+
+ // do validation: check key option(s) in final runtime configuration
+ if
(StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM)))
{
+ LOG.error("can not connect to HBase without {} configuration",
HConstants.ZOOKEEPER_QUORUM);
+ throw new IllegalArgumentException("check HBase configuration
failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+ }
+
+ return runtimeConfig;
+ }
+
+ @Override
+ public void open(FunctionContext context) {
Review comment:
we can keep the function order with parent to make the code more readable
##########
File path:
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table
function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up
the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends
AsyncTableFunction<RowData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
+ private static final long serialVersionUID = 1L;
+
+ private final String hTableName;
+ private final byte[] serializedConfig;
+ private final HBaseTableSchema hbaseTableSchema;
+ private final String nullStringLiteral;
+
+ private transient AsyncConnection asyncConnection;
+ private transient AsyncTable<ScanResultConsumer> table;
+ private transient HBaseSerde serde;
+
+ private final long cacheMaxSize;
+ private final long cacheExpireMs;
+ private final int maxRetryTimes;
+ private transient Cache<Object, RowData> cache;
+
+ public HBaseRowDataAsyncLookupFunction(
+ Configuration configuration,
+ String hTableName,
+ HBaseTableSchema hbaseTableSchema,
+ String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+ this.serializedConfig =
HBaseConfigurationUtil.serializeConfiguration(configuration);
+ this.hTableName = hTableName;
+ this.hbaseTableSchema = hbaseTableSchema;
+ this.nullStringLiteral = nullStringLiteral;
+ this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+ this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+ this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+ }
+
+ /**
+ * The invoke entry point of lookup function.
+ * @param feature The result or exception is returned.
+ * @param rowKey the lookup key. Currently only support single rowkey.
+ */
+ public void eval(CompletableFuture<Collection<RowData>> feature, Object
rowKey) {
+ int currentRetry = 0;
+ if (cache != null){
+ RowData cacheRowData = cache.getIfPresent(rowKey);
+ if (cacheRowData != null){
+ if (cacheRowData.getArity() == 0){
+ feature.complete(Collections.emptyList());
+ } else {
+ feature.complete(Collections.singletonList(cacheRowData));
+ }
+ return;
+ }
+ }
+ // fetch result
+ fetchResult(feature, currentRetry, rowKey);
+ }
+
+ /**
+ * Execute async fetch result .
+ * @param feature The result or exception is returned.
+ * @param currentRetry Current number of retries.
+ * @param rowKey the lookup key.
+ */
+ private void fetchResult(CompletableFuture<Collection<RowData>> feature,
int currentRetry, Object rowKey){
Review comment:
feature -> resultFuture
##########
File path:
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table
function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up
the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends
AsyncTableFunction<RowData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
+ private static final long serialVersionUID = 1L;
+
+ private final String hTableName;
+ private final byte[] serializedConfig;
+ private final HBaseTableSchema hbaseTableSchema;
+ private final String nullStringLiteral;
+
+ private transient AsyncConnection asyncConnection;
+ private transient AsyncTable<ScanResultConsumer> table;
+ private transient HBaseSerde serde;
+
+ private final long cacheMaxSize;
+ private final long cacheExpireMs;
+ private final int maxRetryTimes;
+ private transient Cache<Object, RowData> cache;
+
+ public HBaseRowDataAsyncLookupFunction(
+ Configuration configuration,
+ String hTableName,
+ HBaseTableSchema hbaseTableSchema,
+ String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+ this.serializedConfig =
HBaseConfigurationUtil.serializeConfiguration(configuration);
+ this.hTableName = hTableName;
+ this.hbaseTableSchema = hbaseTableSchema;
+ this.nullStringLiteral = nullStringLiteral;
+ this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+ this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+ this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+ }
+
+ /**
+ * The invoke entry point of lookup function.
+ * @param feature The result or exception is returned.
+ * @param rowKey the lookup key. Currently only support single rowkey.
+ */
+ public void eval(CompletableFuture<Collection<RowData>> feature, Object
rowKey) {
+ int currentRetry = 0;
+ if (cache != null){
+ RowData cacheRowData = cache.getIfPresent(rowKey);
+ if (cacheRowData != null){
+ if (cacheRowData.getArity() == 0){
+ feature.complete(Collections.emptyList());
+ } else {
+ feature.complete(Collections.singletonList(cacheRowData));
+ }
+ return;
+ }
+ }
+ // fetch result
+ fetchResult(feature, currentRetry, rowKey);
+ }
+
+ /**
+ * Execute async fetch result .
+ * @param feature The result or exception is returned.
+ * @param currentRetry Current number of retries.
+ * @param rowKey the lookup key.
+ */
+ private void fetchResult(CompletableFuture<Collection<RowData>> feature,
int currentRetry, Object rowKey){
+ Get get = serde.createGet(rowKey);
+ CompletableFuture<Result> resultFuture = table.get(get);
+ resultFuture.whenCompleteAsync(
+ (result, throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof TableNotFoundException) {
+ LOG.error("Table '{}' not found ", hTableName,
throwable);
+ feature.completeExceptionally(
+ new RuntimeException("HBase table '" + hTableName
+ "' not found.", throwable));
+ } else {
+ LOG.error(String.format("Hbase asyncLookup error,
retry times = %d", currentRetry), throwable);
+ if (currentRetry >= maxRetryTimes) {
+ feature.completeExceptionally(throwable);
+ } else {
+ try {
+ Thread.sleep(1000 * currentRetry);
+ } catch (InterruptedException e1) {
+ feature.completeExceptionally(e1);
+ }
+ fetchResult(feature, currentRetry + 1, rowKey);
+ }
+ }
+ } else {
+ boolean flag = result.isEmpty();
+ if (flag) {
+ feature.complete(Collections.emptyList());
+ if (cache != null) {
+ cache.put(rowKey, new GenericRowData(0));
+ }
+ } else {
+ GenericRowData rowData = (GenericRowData)
serde.convertToRow(result);
+ feature.complete(Collections.singletonList(rowData));
+ if (cache != null){
+ cache.put(rowKey, rowData);
+ }
+ }
+ }
+ });
+ }
+
+ private Configuration prepareRuntimeConfiguration() {
+ // create default configuration from current runtime env
(`hbase-site.xml` in classpath) first,
+ // and overwrite configuration using serialized configuration from
client-side env (`hbase-site.xml` in classpath).
+ // user params from client-side have the highest priority
+ Configuration runtimeConfig =
HBaseConfigurationUtil.deserializeConfiguration(
+ serializedConfig,
+ HBaseConfigurationUtil.getHBaseConfiguration());
+
+ // do validation: check key option(s) in final runtime configuration
+ if
(StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM)))
{
+ LOG.error("can not connect to HBase without {} configuration",
HConstants.ZOOKEEPER_QUORUM);
+ throw new IllegalArgumentException("check HBase configuration
failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+ }
+
+ return runtimeConfig;
+ }
+
+ @Override
+ public void open(FunctionContext context) {
+ LOG.info("start open ...");
+ Configuration config = prepareRuntimeConfiguration();
+ CompletableFuture<AsyncConnection> asyncConnectionFuture =
ConnectionFactory.createAsyncConnection(config);
+ try {
+ asyncConnection = asyncConnectionFuture.get();
+ table = asyncConnection.getTable(TableName.valueOf(hTableName),
(ExecutorService) Executors.directExecutor());
+
+ this.cache = cacheMaxSize == -1 || cacheExpireMs == 0 ? null :
CacheBuilder.newBuilder()
+ .recordStats()
+ .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+ .maximumSize(cacheMaxSize)
+ .build();
+ if (cache != null) {
+ context.getMetricGroup().gauge("lookupCacheHitRate",
(Gauge<Double>) () -> cache.stats().hitRate());
+ }
+ } catch (InterruptedException | ExecutionException e) {
+ LOG.error("Exception while creating connection to HBase.", e);
+ throw new RuntimeException("Cannot create connection to HBase.",
e);
+ }
+ this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
+ LOG.info("end open.");
+ }
+
+ @Override
+ public void close() {
+ LOG.info("start close ...");
+ if (null != asyncConnection) {
+ try {
+ asyncConnection.close();
+ asyncConnection = null;
+ } catch (IOException e) {
+ // ignore exception when close.
+ LOG.warn("exception when close connection", e);
+ }
+ }
+ LOG.info("end close.");
+ }
+
+ @VisibleForTesting
+ public String getHTableName() {
+ return hTableName;
+ }
+
+
Review comment:
redundant blank line
##########
File path: docs/dev/table/connectors/hbase.md
##########
@@ -172,6 +172,34 @@ Connector Options
<td>Integer</td>
<td>Defines the parallelism of the HBase sink operator. By default, the
parallelism is determined by the framework using the same parallelism of the
upstream chained operator.</td>
</tr>
+ <tr>
+ <td><h5>lookup.async</h5></td>
+ <td>optional</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether async lookup are supported.If true,the lookup will be
async.Note,async only supports hbase-2.2 connector.</td>
Review comment:
> async only supports hbase-2.2 connector
It will be great if we can throw exception message in ` hbase-1.4` connector
if user try to use this feature
##########
File path:
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table
function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up
the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends
AsyncTableFunction<RowData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+
+
+ private static final long serialVersionUID = 1L;
+
+ private final String hTableName;
+ private final byte[] serializedConfig;
+ private final HBaseTableSchema hbaseTableSchema;
+ private final String nullStringLiteral;
+
+ private transient AsyncConnection asyncConnection;
+ private transient AsyncTable<ScanResultConsumer> table;
+ private transient HBaseSerde serde;
+
+ private final long cacheMaxSize;
+ private final long cacheExpireMs;
+ private final int maxRetryTimes;
+ private transient Cache<Object, RowData> cache;
+
+ public HBaseRowDataAsyncLookupFunction(
+ Configuration configuration,
+ String hTableName,
+ HBaseTableSchema hbaseTableSchema,
+ String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+ this.serializedConfig =
HBaseConfigurationUtil.serializeConfiguration(configuration);
+ this.hTableName = hTableName;
+ this.hbaseTableSchema = hbaseTableSchema;
+ this.nullStringLiteral = nullStringLiteral;
+ this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+ this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+ this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+ }
+
+ /**
+ * The invoke entry point of lookup function.
+ * @param feature The result or exception is returned.
+ * @param rowKey the lookup key. Currently only support single rowkey.
+ */
+ public void eval(CompletableFuture<Collection<RowData>> feature, Object
rowKey) {
+ int currentRetry = 0;
+ if (cache != null){
+ RowData cacheRowData = cache.getIfPresent(rowKey);
+ if (cacheRowData != null){
+ if (cacheRowData.getArity() == 0){
+ feature.complete(Collections.emptyList());
+ } else {
+ feature.complete(Collections.singletonList(cacheRowData));
+ }
+ return;
+ }
+ }
+ // fetch result
+ fetchResult(feature, currentRetry, rowKey);
+ }
+
+ /**
+ * Execute async fetch result .
+ * @param feature The result or exception is returned.
+ * @param currentRetry Current number of retries.
+ * @param rowKey the lookup key.
+ */
+ private void fetchResult(CompletableFuture<Collection<RowData>> feature,
int currentRetry, Object rowKey){
+ Get get = serde.createGet(rowKey);
+ CompletableFuture<Result> resultFuture = table.get(get);
+ resultFuture.whenCompleteAsync(
+ (result, throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof TableNotFoundException) {
+ LOG.error("Table '{}' not found ", hTableName,
throwable);
+ feature.completeExceptionally(
+ new RuntimeException("HBase table '" + hTableName
+ "' not found.", throwable));
+ } else {
+ LOG.error(String.format("Hbase asyncLookup error,
retry times = %d", currentRetry), throwable);
+ if (currentRetry >= maxRetryTimes) {
+ feature.completeExceptionally(throwable);
+ } else {
+ try {
+ Thread.sleep(1000 * currentRetry);
+ } catch (InterruptedException e1) {
+ feature.completeExceptionally(e1);
+ }
+ fetchResult(feature, currentRetry + 1, rowKey);
+ }
+ }
+ } else {
+ boolean flag = result.isEmpty();
+ if (flag) {
Review comment:
if (result.isEmpty()) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]