This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5175ed0d48835344c1cd4282372d6b01571d914b
Author: slinkydeveloper <[email protected]>
AuthorDate: Thu Jan 6 16:35:09 2022 +0100

    [FLINK-25391][connector-hbase] Forward catalog table options
---
 docs/content/docs/connectors/table/hbase.md        | 17 ++++++++++++-
 .../hbase1/HBase1DynamicTableFactory.java          | 29 ++++++++++++++++------
 .../hbase2/HBase2DynamicTableFactory.java          | 27 ++++++++++++++------
 .../hbase/table/HBaseConnectorOptionsUtil.java     | 18 ++++++--------
 4 files changed, 65 insertions(+), 26 deletions(-)

diff --git a/docs/content/docs/connectors/table/hbase.md 
b/docs/content/docs/connectors/table/hbase.md
index 86d45e5..21436cd 100644
--- a/docs/content/docs/connectors/table/hbase.md
+++ b/docs/content/docs/connectors/table/hbase.md
@@ -82,15 +82,17 @@ Connector Options
       <tr>
         <th class="text-left" style="width: 25%">Option</th>
         <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
         <th class="text-center" style="width: 7%">Default</th>
         <th class="text-center" style="width: 10%">Type</th>
-        <th class="text-center" style="width: 50%">Description</th>
+        <th class="text-center" style="width: 42%">Description</th>
       </tr>
     </thead>
     <tbody>
     <tr>
       <td><h5>connector</h5></td>
       <td>required</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>Specify what connector to use, valid values are:
@@ -103,6 +105,7 @@ Connector Options
     <tr>
       <td><h5>table-name</h5></td>
       <td>required</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The name of HBase table to connect. By default, the table is in 
'default' namespace. To assign the table a specified namespace you need to use 
'namespace:table'.</td>
@@ -110,6 +113,7 @@ Connector Options
     <tr>
       <td><h5>zookeeper.quorum</h5></td>
       <td>required</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>The HBase Zookeeper quorum.</td>
@@ -117,6 +121,7 @@ Connector Options
     <tr>
       <td><h5>zookeeper.znode.parent</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">/hbase</td>
       <td>String</td>
       <td>The root dir in Zookeeper for HBase cluster.</td>
@@ -124,6 +129,7 @@ Connector Options
     <tr>
       <td><h5>null-string-literal</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">null</td>
       <td>String</td>
       <td>Representation for null values for string fields. HBase source and 
sink encodes/decodes empty bytes as null values for all types except string 
type.</td>
@@ -131,6 +137,7 @@ Connector Options
     <tr>
       <td><h5>sink.buffer-flush.max-size</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">2mb</td>
       <td>MemorySize</td>
       <td>Writing option, maximum size in memory of buffered rows for each 
writing request.
@@ -141,6 +148,7 @@ Connector Options
     <tr>
       <td><h5>sink.buffer-flush.max-rows</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">1000</td>
       <td>Integer</td>
       <td>Writing option, maximum number of rows to buffer for each writing 
request.
@@ -151,6 +159,7 @@ Connector Options
     <tr>
       <td><h5>sink.buffer-flush.interval</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">1s</td>
       <td>Duration</td>
       <td>Writing option, the interval to flush any buffered rows.
@@ -162,6 +171,7 @@ Connector Options
     <tr>
       <td><h5>sink.parallelism</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>Integer</td>
       <td>Defines the parallelism of the HBase sink operator. By default, the 
parallelism is determined by the framework using the same parallelism of the 
upstream chained operator.</td>
@@ -169,6 +179,7 @@ Connector Options
     <tr>
       <td><h5>lookup.async</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">false</td>
       <td>Boolean</td>
       <td>Whether async lookup are enabled. If true, the lookup will be async. 
Note, async only supports hbase-2.2 connector.</td>
@@ -176,6 +187,7 @@ Connector Options
     <tr>
       <td><h5>lookup.cache.max-rows</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">-1</td>
       <td>Long</td>
       <td>The max number of rows of lookup cache, over this value, the oldest 
rows will be expired. Note, "lookup.cache.max-rows" and "lookup.cache.ttl" 
options must all be specified if any of them is specified. Lookup cache is 
disabled by default.</td>
@@ -183,6 +195,7 @@ Connector Options
     <tr>
       <td><h5>lookup.cache.ttl</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">0 s</td>
       <td>Duration</td>
       <td>The max time to live for each rows in lookup cache, over this time, 
the oldest rows will be expired. Note, "cache.max-rows" and "cache.ttl" options 
must all be specified if any of them is specified.Lookup cache is disabled by 
default.</td>
@@ -190,6 +203,7 @@ Connector Options
     <tr>
       <td><h5>lookup.max-retries</h5></td>
       <td>optional</td>
+      <td>yes</td>
       <td style="word-wrap: break-word;">3</td>
       <td>Integer</td>
       <td>The max retry times if lookup database failed.</td>
@@ -197,6 +211,7 @@ Connector Options
     <tr>
       <td><h5>properties.*</h5></td>
       <td>optional</td>
+      <td>no</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
       <td>
diff --git 
a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
 
b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
index 3454064..6a3e6ba 100644
--- 
a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
+++ 
b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
@@ -34,9 +34,11 @@ import 
org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
 import org.apache.hadoop.conf.Configuration;
 
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
+import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_ASYNC;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_TTL;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_MAX_RETRIES;
@@ -69,12 +71,10 @@ public class HBase1DynamicTableFactory
 
         final ReadableConfig tableOptions = helper.getOptions();
 
-        Map<String, String> options = context.getCatalogTable().getOptions();
-
         validatePrimaryKey(context.getPhysicalRowDataType(), 
context.getPrimaryKeyIndexes());
 
         String tableName = tableOptions.get(TABLE_NAME);
-        Configuration hbaseClientConf = getHBaseConfiguration(options);
+        Configuration hbaseClientConf = getHBaseConfiguration(tableOptions);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
         HBaseTableSchema hbaseSchema =
                 
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
@@ -94,12 +94,10 @@ public class HBase1DynamicTableFactory
 
         final ReadableConfig tableOptions = helper.getOptions();
 
-        Map<String, String> options = context.getCatalogTable().getOptions();
-
         validatePrimaryKey(context.getPhysicalRowDataType(), 
context.getPrimaryKeyIndexes());
 
         String tableName = tableOptions.get(TABLE_NAME);
-        Configuration hbaseConf = getHBaseConfiguration(options);
+        Configuration hbaseConf = getHBaseConfiguration(tableOptions);
         HBaseWriteOptions hBaseWriteOptions = 
getHBaseWriteOptions(tableOptions);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
         HBaseTableSchema hbaseSchema =
@@ -131,9 +129,26 @@ public class HBase1DynamicTableFactory
         set.add(SINK_BUFFER_FLUSH_MAX_ROWS);
         set.add(SINK_BUFFER_FLUSH_INTERVAL);
         set.add(SINK_PARALLELISM);
+        set.add(LOOKUP_ASYNC);
         set.add(LOOKUP_CACHE_MAX_ROWS);
         set.add(LOOKUP_CACHE_TTL);
         set.add(LOOKUP_MAX_RETRIES);
         return set;
     }
+
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                        TABLE_NAME,
+                        ZOOKEEPER_ZNODE_PARENT,
+                        ZOOKEEPER_QUORUM,
+                        NULL_STRING_LITERAL,
+                        SINK_BUFFER_FLUSH_MAX_SIZE,
+                        SINK_BUFFER_FLUSH_MAX_ROWS,
+                        SINK_BUFFER_FLUSH_INTERVAL,
+                        LOOKUP_CACHE_MAX_ROWS,
+                        LOOKUP_CACHE_TTL,
+                        LOOKUP_MAX_RETRIES)
+                .collect(Collectors.toSet());
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
 
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
index c9246b6..9b9a525 100644
--- 
a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
+++ 
b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
@@ -35,8 +35,9 @@ import 
org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper;
 import org.apache.hadoop.conf.Configuration;
 
 import java.util.HashSet;
-import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_ASYNC;
 import static 
org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_MAX_ROWS;
@@ -71,12 +72,10 @@ public class HBase2DynamicTableFactory
 
         final ReadableConfig tableOptions = helper.getOptions();
 
-        Map<String, String> options = context.getCatalogTable().getOptions();
-
         validatePrimaryKey(context.getPhysicalRowDataType(), 
context.getPrimaryKeyIndexes());
 
         String tableName = tableOptions.get(TABLE_NAME);
-        Configuration hbaseConf = getHBaseConfiguration(options);
+        Configuration hbaseConf = getHBaseConfiguration(tableOptions);
         HBaseLookupOptions lookupOptions = getHBaseLookupOptions(tableOptions);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
         HBaseTableSchema hbaseSchema =
@@ -93,12 +92,10 @@ public class HBase2DynamicTableFactory
 
         final ReadableConfig tableOptions = helper.getOptions();
 
-        Map<String, String> options = context.getCatalogTable().getOptions();
-
         validatePrimaryKey(context.getPhysicalRowDataType(), 
context.getPrimaryKeyIndexes());
 
         String tableName = tableOptions.get(TABLE_NAME);
-        Configuration hbaseConf = getHBaseConfiguration(options);
+        Configuration hbaseConf = getHBaseConfiguration(tableOptions);
         HBaseWriteOptions hBaseWriteOptions = 
getHBaseWriteOptions(tableOptions);
         String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
         HBaseTableSchema hbaseSchema =
@@ -136,4 +133,20 @@ public class HBase2DynamicTableFactory
         set.add(LOOKUP_MAX_RETRIES);
         return set;
     }
+
+    @Override
+    public Set<ConfigOption<?>> forwardOptions() {
+        return Stream.of(
+                        TABLE_NAME,
+                        ZOOKEEPER_ZNODE_PARENT,
+                        ZOOKEEPER_QUORUM,
+                        NULL_STRING_LITERAL,
+                        LOOKUP_CACHE_MAX_ROWS,
+                        LOOKUP_CACHE_TTL,
+                        LOOKUP_MAX_RETRIES,
+                        SINK_BUFFER_FLUSH_MAX_SIZE,
+                        SINK_BUFFER_FLUSH_MAX_ROWS,
+                        SINK_BUFFER_FLUSH_INTERVAL)
+                .collect(Collectors.toSet());
+    }
 }
diff --git 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
index 6b9804f..4585610 100644
--- 
a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
+++ 
b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java
@@ -105,22 +105,18 @@ public class HBaseConnectorOptionsUtil {
         return builder.build();
     }
 
-    /**
-     * config HBase Configuration.
-     *
-     * @param options properties option
-     */
-    public static Configuration getHBaseConfiguration(Map<String, String> 
options) {
-        org.apache.flink.configuration.Configuration tableOptions =
-                org.apache.flink.configuration.Configuration.fromMap(options);
+    /** config HBase Configuration. */
+    public static Configuration getHBaseConfiguration(ReadableConfig 
tableOptions) {
         // create default configuration from current runtime env 
(`hbase-site.xml` in classpath)
         // first,
         Configuration hbaseClientConf = 
HBaseConfigurationUtil.getHBaseConfiguration();
-        hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, 
tableOptions.getString(ZOOKEEPER_QUORUM));
+        hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, 
tableOptions.get(ZOOKEEPER_QUORUM));
         hbaseClientConf.set(
-                HConstants.ZOOKEEPER_ZNODE_PARENT, 
tableOptions.getString(ZOOKEEPER_ZNODE_PARENT));
+                HConstants.ZOOKEEPER_ZNODE_PARENT, 
tableOptions.get(ZOOKEEPER_ZNODE_PARENT));
         // add HBase properties
-        final Properties properties = getHBaseClientProperties(options);
+        final Properties properties =
+                getHBaseClientProperties(
+                        ((org.apache.flink.configuration.Configuration) 
tableOptions).toMap());
         properties.forEach((k, v) -> hbaseClientConf.set(k.toString(), 
v.toString()));
         return hbaseClientConf;
     }

Reply via email to