This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5175ed0d48835344c1cd4282372d6b01571d914b Author: slinkydeveloper <[email protected]> AuthorDate: Thu Jan 6 16:35:09 2022 +0100 [FLINK-25391][connector-hbase] Forward catalog table options --- docs/content/docs/connectors/table/hbase.md | 17 ++++++++++++- .../hbase1/HBase1DynamicTableFactory.java | 29 ++++++++++++++++------ .../hbase2/HBase2DynamicTableFactory.java | 27 ++++++++++++++------ .../hbase/table/HBaseConnectorOptionsUtil.java | 18 ++++++-------- 4 files changed, 65 insertions(+), 26 deletions(-) diff --git a/docs/content/docs/connectors/table/hbase.md b/docs/content/docs/connectors/table/hbase.md index 86d45e5..21436cd 100644 --- a/docs/content/docs/connectors/table/hbase.md +++ b/docs/content/docs/connectors/table/hbase.md @@ -82,15 +82,17 @@ Connector Options <tr> <th class="text-left" style="width: 25%">Option</th> <th class="text-center" style="width: 8%">Required</th> + <th class="text-center" style="width: 8%">Forwarded</th> <th class="text-center" style="width: 7%">Default</th> <th class="text-center" style="width: 10%">Type</th> - <th class="text-center" style="width: 50%">Description</th> + <th class="text-center" style="width: 42%">Description</th> </tr> </thead> <tbody> <tr> <td><h5>connector</h5></td> <td>required</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>Specify what connector to use, valid values are: @@ -103,6 +105,7 @@ Connector Options <tr> <td><h5>table-name</h5></td> <td>required</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The name of HBase table to connect. By default, the table is in 'default' namespace. To assign the table a specified namespace you need to use 'namespace:table'.</td> @@ -110,6 +113,7 @@ Connector Options <tr> <td><h5>zookeeper.quorum</h5></td> <td>required</td> + <td>yes</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td>The HBase Zookeeper quorum.</td> @@ -117,6 +121,7 @@ Connector Options <tr> <td><h5>zookeeper.znode.parent</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">/hbase</td> <td>String</td> <td>The root dir in Zookeeper for HBase cluster.</td> @@ -124,6 +129,7 @@ Connector Options <tr> <td><h5>null-string-literal</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">null</td> <td>String</td> <td>Representation for null values for string fields. HBase source and sink encodes/decodes empty bytes as null values for all types except string type.</td> @@ -131,6 +137,7 @@ Connector Options <tr> <td><h5>sink.buffer-flush.max-size</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">2mb</td> <td>MemorySize</td> <td>Writing option, maximum size in memory of buffered rows for each writing request. @@ -141,6 +148,7 @@ Connector Options <tr> <td><h5>sink.buffer-flush.max-rows</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">1000</td> <td>Integer</td> <td>Writing option, maximum number of rows to buffer for each writing request. @@ -151,6 +159,7 @@ Connector Options <tr> <td><h5>sink.buffer-flush.interval</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">1s</td> <td>Duration</td> <td>Writing option, the interval to flush any buffered rows. @@ -162,6 +171,7 @@ Connector Options <tr> <td><h5>sink.parallelism</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>Integer</td> <td>Defines the parallelism of the HBase sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.</td> @@ -169,6 +179,7 @@ Connector Options <tr> <td><h5>lookup.async</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">false</td> <td>Boolean</td> <td>Whether async lookup are enabled. If true, the lookup will be async. Note, async only supports hbase-2.2 connector.</td> @@ -176,6 +187,7 @@ Connector Options <tr> <td><h5>lookup.cache.max-rows</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">-1</td> <td>Long</td> <td>The max number of rows of lookup cache, over this value, the oldest rows will be expired. Note, "lookup.cache.max-rows" and "lookup.cache.ttl" options must all be specified if any of them is specified. Lookup cache is disabled by default.</td> @@ -183,6 +195,7 @@ Connector Options <tr> <td><h5>lookup.cache.ttl</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">0 s</td> <td>Duration</td> <td>The max time to live for each rows in lookup cache, over this time, the oldest rows will be expired. Note, "cache.max-rows" and "cache.ttl" options must all be specified if any of them is specified.Lookup cache is disabled by default.</td> @@ -190,6 +203,7 @@ Connector Options <tr> <td><h5>lookup.max-retries</h5></td> <td>optional</td> + <td>yes</td> <td style="word-wrap: break-word;">3</td> <td>Integer</td> <td>The max retry times if lookup database failed.</td> @@ -197,6 +211,7 @@ Connector Options <tr> <td><h5>properties.*</h5></td> <td>optional</td> + <td>no</td> <td style="word-wrap: break-word;">(none)</td> <td>String</td> <td> diff --git a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java index 3454064..6a3e6ba 100644 --- a/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java +++ b/flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java @@ -34,9 +34,11 @@ import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; import org.apache.hadoop.conf.Configuration; import java.util.HashSet; -import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_ASYNC; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_MAX_ROWS; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_TTL; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_MAX_RETRIES; @@ -69,12 +71,10 @@ public class HBase1DynamicTableFactory final ReadableConfig tableOptions = helper.getOptions(); - Map<String, String> options = context.getCatalogTable().getOptions(); - validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes()); String tableName = tableOptions.get(TABLE_NAME); - Configuration hbaseClientConf = getHBaseConfiguration(options); + Configuration hbaseClientConf = getHBaseConfiguration(tableOptions); String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL); HBaseTableSchema hbaseSchema = HBaseTableSchema.fromDataType(context.getPhysicalRowDataType()); @@ -94,12 +94,10 @@ public class HBase1DynamicTableFactory final ReadableConfig tableOptions = helper.getOptions(); - Map<String, String> options = context.getCatalogTable().getOptions(); - validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes()); String tableName = tableOptions.get(TABLE_NAME); - Configuration hbaseConf = getHBaseConfiguration(options); + Configuration hbaseConf = getHBaseConfiguration(tableOptions); HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions); String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL); HBaseTableSchema hbaseSchema = @@ -131,9 +129,26 @@ public class HBase1DynamicTableFactory set.add(SINK_BUFFER_FLUSH_MAX_ROWS); set.add(SINK_BUFFER_FLUSH_INTERVAL); set.add(SINK_PARALLELISM); + set.add(LOOKUP_ASYNC); set.add(LOOKUP_CACHE_MAX_ROWS); set.add(LOOKUP_CACHE_TTL); set.add(LOOKUP_MAX_RETRIES); return set; } + + @Override + public Set<ConfigOption<?>> forwardOptions() { + return Stream.of( + TABLE_NAME, + ZOOKEEPER_ZNODE_PARENT, + ZOOKEEPER_QUORUM, + NULL_STRING_LITERAL, + SINK_BUFFER_FLUSH_MAX_SIZE, + SINK_BUFFER_FLUSH_MAX_ROWS, + SINK_BUFFER_FLUSH_INTERVAL, + LOOKUP_CACHE_MAX_ROWS, + LOOKUP_CACHE_TTL, + LOOKUP_MAX_RETRIES) + .collect(Collectors.toSet()); + } } diff --git a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java index c9246b6..9b9a525 100644 --- a/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java +++ b/flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java @@ -35,8 +35,9 @@ import org.apache.flink.table.factories.FactoryUtil.TableFactoryHelper; import org.apache.hadoop.conf.Configuration; import java.util.HashSet; -import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_ASYNC; import static org.apache.flink.connector.hbase.table.HBaseConnectorOptions.LOOKUP_CACHE_MAX_ROWS; @@ -71,12 +72,10 @@ public class HBase2DynamicTableFactory final ReadableConfig tableOptions = helper.getOptions(); - Map<String, String> options = context.getCatalogTable().getOptions(); - validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes()); String tableName = tableOptions.get(TABLE_NAME); - Configuration hbaseConf = getHBaseConfiguration(options); + Configuration hbaseConf = getHBaseConfiguration(tableOptions); HBaseLookupOptions lookupOptions = getHBaseLookupOptions(tableOptions); String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL); HBaseTableSchema hbaseSchema = @@ -93,12 +92,10 @@ public class HBase2DynamicTableFactory final ReadableConfig tableOptions = helper.getOptions(); - Map<String, String> options = context.getCatalogTable().getOptions(); - validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes()); String tableName = tableOptions.get(TABLE_NAME); - Configuration hbaseConf = getHBaseConfiguration(options); + Configuration hbaseConf = getHBaseConfiguration(tableOptions); HBaseWriteOptions hBaseWriteOptions = getHBaseWriteOptions(tableOptions); String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL); HBaseTableSchema hbaseSchema = @@ -136,4 +133,20 @@ public class HBase2DynamicTableFactory set.add(LOOKUP_MAX_RETRIES); return set; } + + @Override + public Set<ConfigOption<?>> forwardOptions() { + return Stream.of( + TABLE_NAME, + ZOOKEEPER_ZNODE_PARENT, + ZOOKEEPER_QUORUM, + NULL_STRING_LITERAL, + LOOKUP_CACHE_MAX_ROWS, + LOOKUP_CACHE_TTL, + LOOKUP_MAX_RETRIES, + SINK_BUFFER_FLUSH_MAX_SIZE, + SINK_BUFFER_FLUSH_MAX_ROWS, + SINK_BUFFER_FLUSH_INTERVAL) + .collect(Collectors.toSet()); + } } diff --git a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java index 6b9804f..4585610 100644 --- a/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java +++ b/flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/table/HBaseConnectorOptionsUtil.java @@ -105,22 +105,18 @@ public class HBaseConnectorOptionsUtil { return builder.build(); } - /** - * config HBase Configuration. - * - * @param options properties option - */ - public static Configuration getHBaseConfiguration(Map<String, String> options) { - org.apache.flink.configuration.Configuration tableOptions = - org.apache.flink.configuration.Configuration.fromMap(options); + /** config HBase Configuration. */ + public static Configuration getHBaseConfiguration(ReadableConfig tableOptions) { // create default configuration from current runtime env (`hbase-site.xml` in classpath) // first, Configuration hbaseClientConf = HBaseConfigurationUtil.getHBaseConfiguration(); - hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, tableOptions.getString(ZOOKEEPER_QUORUM)); + hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, tableOptions.get(ZOOKEEPER_QUORUM)); hbaseClientConf.set( - HConstants.ZOOKEEPER_ZNODE_PARENT, tableOptions.getString(ZOOKEEPER_ZNODE_PARENT)); + HConstants.ZOOKEEPER_ZNODE_PARENT, tableOptions.get(ZOOKEEPER_ZNODE_PARENT)); // add HBase properties - final Properties properties = getHBaseClientProperties(options); + final Properties properties = + getHBaseClientProperties( + ((org.apache.flink.configuration.Configuration) tableOptions).toMap()); properties.forEach((k, v) -> hbaseClientConf.set(k.toString(), v.toString())); return hbaseClientConf; }
