wuchong commented on a change in pull request #14684:
URL: https://github.com/apache/flink/pull/14684#discussion_r591233115



##########
File path: 
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table 
function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up 
the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends 
AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = 
HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = 
ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), 
(ExecutorService) Executors.directExecutor());
+
+            this.cache = cacheMaxSize == -1 || cacheExpireMs == 0 ? null : 
CacheBuilder.newBuilder()

Review comment:
       It would be safer to use `cacheMaxSize <= 0 || cacheExpireMs <= 0`.

##########
File path: 
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
##########
@@ -223,8 +223,22 @@ public Get createGet(Object rowKey) {
         return get;
     }
 
-    /** Converts HBase {@link Result} into {@link RowData}. */
-    public RowData convertToRow(Result result) {
+    /**
+     * Converts HBase {@link Result} into {@link RowData}.
+     * @param result result of a query
+     * @param needReuse Whether the RowData structure needs to be reused
+     * @return
+     */
+    public RowData convertToRow(Result result, Boolean needReuse) {

Review comment:
       Use primitive `boolean` instead of `Boolean` if it should never be null. 

##########
File path: 
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
##########
@@ -223,8 +223,22 @@ public Get createGet(Object rowKey) {
         return get;
     }
 
-    /** Converts HBase {@link Result} into {@link RowData}. */
-    public RowData convertToRow(Result result) {
+    /**
+     * Converts HBase {@link Result} into {@link RowData}.
+     * @param result result of a query
+     * @param needReuse Whether the RowData structure needs to be reused
+     * @return
+     */
+    public RowData convertToRow(Result result, Boolean needReuse) {
+        if (!needReuse){
+            // The output rows needs to be initialized each time
+            // to prevent the possibility of putting the output object into 
the cache.
+            reusedRow = new GenericRowData(fieldLength);

Review comment:
       If not reuse, we shouldn't assign a new object to `resusedRow`, 
otherwise, the new object will be changed in the next time. 
   
   We can have a local variable `reusedRow` and `reusedFamilyRows`, and assign 
them according to the `needReuse` flag. 
   
   Besides, please add a test for this. 

##########
File path: 
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table 
function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up 
the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends 
AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = 
HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = 
ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), 
(ExecutorService) Executors.directExecutor());
+
+            this.cache = cacheMaxSize == -1 || cacheExpireMs == 0 ? null : 
CacheBuilder.newBuilder()
+                .recordStats()
+                .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+                .maximumSize(cacheMaxSize)
+                .build();
+            if (cache != null && context != null) {
+                context.getMetricGroup().gauge("lookupCacheHitRate", 
(Gauge<Double>) () -> cache.stats().hitRate());
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Exception while creating connection to HBase.", e);
+            throw new RuntimeException("Cannot create connection to HBase.", 
e);
+        }
+        this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
+        LOG.info("end open.");
+    }
+
+    /**
+     * The invoke entry point of lookup function.
+     * @param feature The result or exception is returned.
+     * @param rowKey the lookup key. Currently only support single rowkey.
+     */
+    public void eval(CompletableFuture<Collection<RowData>> feature, Object 
rowKey) {

Review comment:
       `feature` ==> `future`

##########
File path: 
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
##########
@@ -223,8 +223,22 @@ public Get createGet(Object rowKey) {
         return get;
     }
 
-    /** Converts HBase {@link Result} into {@link RowData}. */
-    public RowData convertToRow(Result result) {
+    /**
+     * Converts HBase {@link Result} into {@link RowData}.
+     * @param result result of a query
+     * @param needReuse Whether the RowData structure needs to be reused
+     * @return

Review comment:
       remove if the javadoc is empty. 

##########
File path: 
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table 
function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up 
the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends 
AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = 
HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = 
ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), 
(ExecutorService) Executors.directExecutor());
+
+            this.cache = cacheMaxSize == -1 || cacheExpireMs == 0 ? null : 
CacheBuilder.newBuilder()
+                .recordStats()
+                .expireAfterWrite(cacheExpireMs, TimeUnit.MILLISECONDS)
+                .maximumSize(cacheMaxSize)
+                .build();
+            if (cache != null && context != null) {
+                context.getMetricGroup().gauge("lookupCacheHitRate", 
(Gauge<Double>) () -> cache.stats().hitRate());
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("Exception while creating connection to HBase.", e);
+            throw new RuntimeException("Cannot create connection to HBase.", 
e);
+        }
+        this.serde = new HBaseSerde(hbaseTableSchema, nullStringLiteral);
+        LOG.info("end open.");
+    }
+
+    /**
+     * The invoke entry point of lookup function.
+     * @param feature The result or exception is returned.
+     * @param rowKey the lookup key. Currently only support single rowkey.
+     */
+    public void eval(CompletableFuture<Collection<RowData>> feature, Object 
rowKey) {
+        int currentRetry = 0;
+        if (cache != null){
+            RowData cacheRowData = cache.getIfPresent(rowKey);
+            if (cacheRowData  != null){
+                if (cacheRowData.getArity() == 0){
+                    feature.complete(Collections.emptyList());
+                } else {
+                    feature.complete(Collections.singletonList(cacheRowData));
+                }
+                return;
+            }
+        }
+        // fetch result
+        fetchResult(feature, currentRetry, rowKey);
+    }
+
+    /**
+     * Execute async fetch result .
+     * @param resultFuture The result or exception is returned.
+     * @param currentRetry Current number of retries.
+     * @param rowKey the lookup key.
+     */
+    private void fetchResult(CompletableFuture<Collection<RowData>> 
resultFuture, int currentRetry, Object rowKey){
+        Get get = serde.createGet(rowKey);
+        CompletableFuture<Result> responseFuture = table.get(get);
+        responseFuture.whenCompleteAsync(
+            (result, throwable) -> {
+                if (throwable != null) {
+                    if (throwable instanceof TableNotFoundException) {
+                        LOG.error("Table '{}' not found ", hTableName, 
throwable);
+                        resultFuture.completeExceptionally(
+                            new RuntimeException("HBase table '" + hTableName 
+ "' not found.", throwable));
+                    } else {
+                        LOG.error(String.format("HBase asyncLookup error, 
retry times = %d", currentRetry), throwable);
+                        if (currentRetry >= maxRetryTimes) {
+                            resultFuture.completeExceptionally(throwable);
+                        } else {
+                            try {
+                                Thread.sleep(1000 * currentRetry);
+                            } catch (InterruptedException e1) {
+                                resultFuture.completeExceptionally(e1);
+                            }
+                            fetchResult(resultFuture, currentRetry + 1, 
rowKey);
+                        }
+                    }
+                } else {
+                    if (result.isEmpty()) {
+                        resultFuture.complete(Collections.emptyList());
+                        if (cache != null) {
+                            cache.put(rowKey, new GenericRowData(0));
+                        }
+                    } else {
+                        if (cache != null){
+                            RowData rowData = serde.convertToRow(result, 
false);
+                            
resultFuture.complete(Collections.singletonList(rowData));
+                            cache.put(rowKey, rowData);
+                        } else {
+                            
resultFuture.complete(Collections.singletonList(serde.convertToRow(result, 
true)));
+                        }
+                    }
+                }
+            });
+    }
+
+    private Configuration prepareRuntimeConfiguration() {
+        // create default configuration from current runtime env 
(`hbase-site.xml` in classpath) first,
+        // and overwrite configuration using serialized configuration from 
client-side env (`hbase-site.xml` in classpath).
+        // user params from client-side have the highest priority
+        Configuration runtimeConfig = 
HBaseConfigurationUtil.deserializeConfiguration(
+                serializedConfig,
+                HBaseConfigurationUtil.getHBaseConfiguration());
+
+        // do validation: check key option(s) in final runtime configuration
+        if 
(StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM)))
 {
+            LOG.error("can not connect to HBase without {} configuration", 
HConstants.ZOOKEEPER_QUORUM);
+            throw new IllegalArgumentException("check HBase configuration 
failed, lost: '" + HConstants.ZOOKEEPER_QUORUM + "'!");
+        }
+
+        return runtimeConfig;
+    }
+
+    @Override
+    public void close() {
+        LOG.info("start close ...");
+        if (null != asyncConnection) {

Review comment:
       Why not close `table`?

##########
File path: 
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
##########
@@ -122,6 +127,10 @@ public String factoryIdentifier() {
         set.add(SINK_BUFFER_FLUSH_MAX_ROWS);
         set.add(SINK_BUFFER_FLUSH_INTERVAL);
         set.add(SINK_PARALLELISM);
+        set.add(LOOKUP_ASYNC);

Review comment:
       We don't need to add this option if we don't support it. 

##########
File path: 
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/util/HBaseSerde.java
##########
@@ -223,8 +223,22 @@ public Get createGet(Object rowKey) {
         return get;
     }
 
-    /** Converts HBase {@link Result} into {@link RowData}. */
-    public RowData convertToRow(Result result) {
+    /**
+     * Converts HBase {@link Result} into {@link RowData}.
+     * @param result result of a query
+     * @param needReuse Whether the RowData structure needs to be reused
+     * @return
+     */
+    public RowData convertToRow(Result result, Boolean needReuse) {
+        if (!needReuse){
+            // The output rows needs to be initialized each time
+            // to prevent the possibility of putting the output object into 
the cache.
+            reusedRow = new GenericRowData(fieldLength);

Review comment:
       This does have a bug because this method is not thread-safe. When it is 
called by multiple thread, all the thread will get the same result object. I 
think we can have two method and share the common logic. 
   
   ```java
   /**
        * Converts HBase {@link Result} into a new {@link RowData} instance.
        *
        * <p>Note: this method is thread-safe.
        */
       public RowData convertToNewRow(Result result) {
           // The output rows needs to be initialized each time
           // to prevent the possibility of putting the output object into the 
cache.
           GenericRowData resultRow = new GenericRowData(fieldLength);
           GenericRowData[] familyRows = new GenericRowData[families.length];
           for (int f = 0; f < families.length; f++) {
               familyRows[f] = new GenericRowData(qualifiers[f].length);
           }
           return convertToRow(result, resultRow, familyRows);
       }
   
       /**
        * Converts HBase {@link Result} into a reused {@link RowData} instance.
        *
        * <p>Note: this method is NOT thread-safe.
        */
       public RowData convertToReusedRow(Result result) {
           return convertToRow(result, reusedRow, reusedFamilyRows);
       }
   
       private RowData convertToRow(
               Result result, GenericRowData resultRow, GenericRowData[] 
familyRows) {
           for (int i = 0; i < fieldLength; i++) {
               if (rowkeyIndex == i) {
                   assert keyDecoder != null;
                   Object rowkey = keyDecoder.decode(result.getRow());
                   resultRow.setField(rowkeyIndex, rowkey);
               } else {
                   int f = (rowkeyIndex != -1 && i > rowkeyIndex) ? i - 1 : i;
                   // get family key
                   byte[] familyKey = families[f];
                   GenericRowData familyRow = familyRows[f];
                   for (int q = 0; q < this.qualifiers[f].length; q++) {
                       // get quantifier key
                       byte[] qualifier = qualifiers[f][q];
                       // read value
                       byte[] value = result.getValue(familyKey, qualifier);
                       familyRow.setField(q, 
qualifierDecoders[f][q].decode(value));
                   }
                   resultRow.setField(i, familyRow);
               }
           }
           return resultRow;
       }
   ```
   
   

##########
File path: 
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table 
function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up 
the result as {@link

Review comment:
       The javadoc is incorrect. Table API doesn't support 
`AsyncTableFunction`, thus, this implementation can only be an internal class 
and can't be used by users directly. 
   
   You can simply describe the class is an implemenation to lookup HBase data 
by rowkey in async fashion. 

##########
File path: docs/content/docs/connectors/table/hbase.md
##########
@@ -166,6 +166,34 @@ Connector Options
       <td>Integer</td>
       <td>Defines the parallelism of the HBase sink operator. By default, the 
parallelism is determined by the framework using the same parallelism of the 
upstream chained operator.</td>
     </tr>
+    <tr>
+      <td><h5>lookup.async</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Whether async lookup are supported. If true, the lookup will be 
async. Note, async only supports hbase-2.2 connector.</td>

Review comment:
       ```suggestion
         <td>Whether async lookup are enabled. If true, the lookup will be 
async. Note, async only supports hbase-2.2 connector.</td>
   ```

##########
File path: 
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java
##########
@@ -35,17 +42,45 @@ public HBaseDynamicTableSource(
             Configuration conf,
             String tableName,
             HBaseTableSchema hbaseSchema,
-            String nullStringLiteral) {
-        super(conf, tableName, hbaseSchema, nullStringLiteral);
+            String nullStringLiteral,
+            HBaseLookupOptions lookupOptions) {
+        super(conf, tableName, hbaseSchema, nullStringLiteral, lookupOptions);
+    }
+
+    @Override
+    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext 
context) {
+        checkArgument(context.getKeys().length == 1 && 
context.getKeys()[0].length == 1,
+            "Currently, HBase table can only be lookup by single rowkey.");
+        checkArgument(
+            hbaseSchema.getRowKeyName().isPresent(),
+            "HBase schema must have a row key when used in lookup mode.");
+        checkArgument(
+            hbaseSchema
+                .convertsToTableSchema()
+                .getTableColumn(context.getKeys()[0][0])
+                .filter(f -> 
f.getName().equals(hbaseSchema.getRowKeyName().get()))
+                .isPresent(),
+            "Currently, HBase table only supports lookup by rowkey field.");
+        if (lookupOptions.getLookupAsync()){
+            return AsyncTableFunctionProvider.of(new 
HBaseRowDataAsyncLookupFunction(conf, tableName, hbaseSchema,
+                nullStringLiteral, lookupOptions));
+        }
+        return TableFunctionProvider.of(new HBaseRowDataLookupFunction(conf, 
tableName, hbaseSchema, nullStringLiteral,

Review comment:
       Nit: can put in `else` branch. 

##########
File path: 
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunction.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseConfigurationUtil;
+import org.apache.flink.connector.hbase.util.HBaseSerde;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.functions.AsyncTableFunction;
+import org.apache.flink.table.functions.FunctionContext;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ScanResultConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The HBaseRowDataAsyncLookupFunction is a standard user-defined table 
function, it can be used in
+ * tableAPI and also useful for temporal table join plan in SQL. It looks up 
the result as {@link
+ * RowData}.
+ */
+@Internal
+public class HBaseRowDataAsyncLookupFunction extends 
AsyncTableFunction<RowData> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseRowDataAsyncLookupFunction.class);
+    private static final long serialVersionUID = 1L;
+
+    private final String hTableName;
+    private final byte[] serializedConfig;
+    private final HBaseTableSchema hbaseTableSchema;
+    private final String nullStringLiteral;
+
+    private transient AsyncConnection asyncConnection;
+    private transient AsyncTable<ScanResultConsumer> table;
+    private transient HBaseSerde serde;
+
+    private final long cacheMaxSize;
+    private final long cacheExpireMs;
+    private final int maxRetryTimes;
+    private transient Cache<Object, RowData> cache;
+
+    public HBaseRowDataAsyncLookupFunction(
+            Configuration configuration,
+            String hTableName,
+            HBaseTableSchema hbaseTableSchema,
+            String nullStringLiteral, HBaseLookupOptions lookupOptions) {
+        this.serializedConfig = 
HBaseConfigurationUtil.serializeConfiguration(configuration);
+        this.hTableName = hTableName;
+        this.hbaseTableSchema = hbaseTableSchema;
+        this.nullStringLiteral = nullStringLiteral;
+        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
+        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
+        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
+    }
+
+    @Override
+    public void open(FunctionContext context) {
+        LOG.info("start open ...");
+        Configuration config = prepareRuntimeConfiguration();
+        CompletableFuture<AsyncConnection> asyncConnectionFuture = 
ConnectionFactory.createAsyncConnection(config);
+        try {
+            asyncConnection = asyncConnectionFuture.get();
+            table = asyncConnection.getTable(TableName.valueOf(hTableName), 
(ExecutorService) Executors.directExecutor());

Review comment:
       IIUC, the executor will be used to execute callbacks. Using 
`directExecutor` means all callback can't be executed async and may have 
performance problem. I think we shouldcreate a `Executors.newFixedThreadPool` 
for this. 
   
   See example: 
https://github.com/apache/hbase/blob/master/hbase-examples/src/main/java/org/apache/hadoop/hbase/client/example/AsyncClientExample.java

##########
File path: 
flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/HBaseConnectorITCase.java
##########
@@ -581,6 +581,82 @@ public void testHBaseLookupTableSource() {
         assertEquals(expected, result);
     }
 
+    @Test
+    public void testHBaseAsyncLookupTableSource() {

Review comment:
       This test is a duplicate of `testHBaseLookupTableSource`. You can share 
the common logic of them, e.g. extract a common testing method:
   
   ```
   private void verifyHBaseLookupJoin(boolean async) {
     ...
   }
   
   ``

##########
File path: 
flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase.util.PlannerType;
+import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
+public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {
+
+    @Override
+    protected PlannerType planner() {
+        // lookup table source is only supported in blink planner
+        return PlannerType.BLINK_PLANNER;
+    }
+
+    @Test
+    public void testEval() throws Exception {
+        HBaseRowDataAsyncLookupFunction lookupFunction = 
buildRowDataAsyncLookupFunction();
+
+        lookupFunction.open(null);
+        List<RowData> list = new ArrayList<>();
+        int[] rowKey = {1, 2, 3};
+        for (int i = 0; i < rowKey.length; i++){
+            CompletableFuture<Collection<RowData>> feature = new 
CompletableFuture<>();
+            lookupFunction.eval(feature, rowKey[i]);
+            list.add(feature.get().iterator().next());
+        }
+        lookupFunction.close();
+        List<String> result =
+            Lists.newArrayList(list).stream()
+                .map(RowData::toString)
+                .sorted()
+                .collect(Collectors.toList());

Review comment:
       The above test also tests non-exist keys, and please also test it using 
a no-cache async lookup function.

##########
File path: 
flink-connectors/flink-connector-hbase-2.2/src/test/java/org/apache/flink/connector/hbase2/source/HBaseRowDataAsyncLookupFunctionTest.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.hbase2.source;
+
+import org.apache.flink.connector.hbase.options.HBaseLookupOptions;
+import org.apache.flink.connector.hbase.util.HBaseTableSchema;
+import org.apache.flink.connector.hbase.util.PlannerType;
+import org.apache.flink.connector.hbase2.util.HBaseTestBase;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+
+/** Test suite for {@link HBaseRowDataAsyncLookupFunction}. */
+public class HBaseRowDataAsyncLookupFunctionTest extends HBaseTestBase {
+
+    @Override
+    protected PlannerType planner() {
+        // lookup table source is only supported in blink planner
+        return PlannerType.BLINK_PLANNER;
+    }
+
+    @Test
+    public void testEval() throws Exception {
+        HBaseRowDataAsyncLookupFunction lookupFunction = 
buildRowDataAsyncLookupFunction();
+
+        lookupFunction.open(null);
+        List<RowData> list = new ArrayList<>();
+        int[] rowKey = {1, 2, 3};
+        for (int i = 0; i < rowKey.length; i++){
+            CompletableFuture<Collection<RowData>> feature = new 
CompletableFuture<>();
+            lookupFunction.eval(feature, rowKey[i]);
+            list.add(feature.get().iterator().next());
+        }
+        lookupFunction.close();
+        List<String> result =
+            Lists.newArrayList(list).stream()
+                .map(RowData::toString)
+                .sorted()
+                .collect(Collectors.toList());

Review comment:
       This makes the async lookup to be sync and doesn't test the feature. I 
suggest to change it to async fashion, and it reveals a bug in HBaseSerDe. You 
can use the following test:
   
   
   ```java
   final List<String> result = new ArrayList<>();
           int[] rowkeys = {1, 2, 1, 12, 3, 12, 4, 3};
           CountDownLatch latch = new CountDownLatch(rowkeys.length);
           for (int rowkey : rowkeys) {
               CompletableFuture<Collection<RowData>> future = new 
CompletableFuture<>();
               lookupFunction.eval(future, rowkey);
               future.whenComplete(
                       (rs, t) -> {
                           System.out.println("complete: " + rs);
                           synchronized (result) {
                               if (rs.isEmpty()) {
                                   result.add(rowkey + ": null");
                               } else {
                                   rs.forEach(row -> result.add(rowkey + ": " + 
row.toString()));
                               }
                           }
                           latch.countDown();
                       });
           }
   
           // this verifies lookup calls are async
           assertTrue(result.size() < rowkeys.length);
           latch.await();
           
           assertEquals(...);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to