leonardBang commented on a change in pull request #14536:
URL: https://github.com/apache/flink/pull/14536#discussion_r554998587
##########
File path:
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
##########
@@ -104,10 +106,14 @@
+ "Can be set to '0' to disable it. Note,
both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
+ "can be set to '0' with the flush
interval set allowing for complete async processing of buffered actions.");
+ // Prefix for Hbase specific properties.
Review comment:
```suggestion
// Prefix for HBase specific properties.
```
##########
File path:
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1TableFactory.java
##########
@@ -88,7 +90,14 @@
descriptorProperties
.getOptionalString(CONNECTOR_ZK_NODE_PARENT)
.ifPresent(v ->
hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
-
+ // add hbase properties
Review comment:
```suggestion
// add HBase properties
```
##########
File path:
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2TableFactory.java
##########
@@ -80,6 +81,15 @@
final DescriptorProperties descriptorProperties =
getValidatedProperties(properties);
Configuration hbaseClientConf = getHConf(descriptorProperties);
+ // add hbase properties
Review comment:
```suggestion
// add HBase properties
```
##########
File path:
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
##########
@@ -114,4 +117,29 @@ public String asSummaryString() {
public HBaseTableSchema getHBaseTableSchema() {
return this.hbaseSchema;
}
+
+ // get hbase table properties which start with prefix
Review comment:
```suggestion
// get HBase table properties which start with prefix
```
##########
File path:
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
##########
@@ -118,6 +124,10 @@ public DynamicTableSource createDynamicTableSource(Context
context) {
hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM,
helper.getOptions().get(ZOOKEEPER_QUORUM));
hbaseClientConf.set(
HConstants.ZOOKEEPER_ZNODE_PARENT,
helper.getOptions().get(ZOOKEEPER_ZNODE_PARENT));
+ // add hbase properties
Review comment:
```suggestion
// add HBase properties
```
##########
File path:
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1TableFactory.java
##########
@@ -106,6 +115,16 @@
descriptorProperties
.getOptionalString(CONNECTOR_ZK_NODE_PARENT)
.ifPresent(hbaseOptionsBuilder::setZkNodeParent);
+ Properties hbaseProperties = new Properties();
+ // add hbase properties
Review comment:
```suggestion
// add HBase properties
```
##########
File path:
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
##########
@@ -103,31 +105,46 @@
+ "Can be set to '0' to disable it. Note,
both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
+ "can be set to '0' with the flush
interval set allowing for complete async processing of buffered actions.");
+ // Prefix for Hbase specific properties.
Review comment:
```suggestion
// Prefix for HBase specific properties.
```
##########
File path:
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2TableFactory.java
##########
@@ -96,6 +106,16 @@
TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(SCHEMA));
HBaseTableSchema hbaseSchema = validateTableSchema(tableSchema);
+ Configuration hbaseClientConf = getHConf(descriptorProperties);
+ // add hbase properties
Review comment:
```suggestion
// add HBase properties
```
##########
File path:
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseUpsertTableSink.java
##########
@@ -96,6 +96,13 @@ public TableSchema getTableSchema() {
hbaseOptions
.getZkNodeParent()
.ifPresent(v ->
hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
+ // add hbase properties
Review comment:
```suggestion
// add HBase properties
```
##########
File path:
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1TableFactory.java
##########
@@ -200,6 +219,9 @@ private DescriptorProperties
getValidatedProperties(Map<String, String> properti
properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
properties.add(SCHEMA + "." +
DescriptorProperties.PRIMARY_KEY_COLUMNS);
+ // hbase properties
Review comment:
```suggestion
// HBase properties
```
##########
File path:
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
##########
@@ -103,31 +105,46 @@
+ "Can be set to '0' to disable it. Note,
both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
+ "can be set to '0' with the flush
interval set allowing for complete async processing of buffered actions.");
+ // Prefix for Hbase specific properties.
+ public static final String PROPERTIES_PREFIX = "properties.";
+
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
TableFactoryHelper helper = createTableFactoryHelper(this, context);
- helper.validate();
+ helper.validateExcept(PROPERTIES_PREFIX);
+
TableSchema tableSchema = context.getCatalogTable().getSchema();
validatePrimaryKey(tableSchema);
String hTableName = helper.getOptions().get(TABLE_NAME);
+ Configuration hbaseConf = getHbaseConf(helper);
+ // add hbase properties
+ final Properties properties =
+ getSpecifyProperties(context.getCatalogTable().getOptions(),
PROPERTIES_PREFIX);
+ properties.forEach((k, v) -> hbaseConf.set(k.toString(),
v.toString()));
+
String nullStringLiteral =
helper.getOptions().get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromTableSchema(tableSchema);
- return new HBaseDynamicTableSource(
- getHbaseConf(helper), hTableName, hbaseSchema,
nullStringLiteral);
+ return new HBaseDynamicTableSource(hbaseConf, hTableName, hbaseSchema,
nullStringLiteral);
}
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
TableFactoryHelper helper = createTableFactoryHelper(this, context);
- helper.validate();
+ helper.validateExcept(PROPERTIES_PREFIX);
TableSchema tableSchema = context.getCatalogTable().getSchema();
validatePrimaryKey(tableSchema);
String hTableName = helper.getOptions().get(TABLE_NAME);
+ Configuration hbaseConf = getHbaseConf(helper);
+ // add hbase properties
Review comment:
```suggestion
// add HBase properties
```
##########
File path:
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2TableFactory.java
##########
@@ -188,6 +208,9 @@ private DescriptorProperties
getValidatedProperties(Map<String, String> properti
properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
properties.add(SCHEMA + "." +
DescriptorProperties.PRIMARY_KEY_COLUMNS);
+ // hbase properties
Review comment:
```suggestion
// HBase properties
```
##########
File path:
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/options/HBaseOptions.java
##########
@@ -120,11 +136,18 @@ public Builder setZkNodeParent(String zkNodeParent) {
return this;
}
+ /** Optional. Sets hbase properties for hbase configuration. */
Review comment:
```suggestion
/** Optional. Sets HBase properties for hbase configuration. */
```
##########
File path:
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
##########
@@ -128,14 +138,20 @@ public DynamicTableSource
createDynamicTableSource(Context context) {
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
TableFactoryHelper helper = createTableFactoryHelper(this, context);
- helper.validate();
+ helper.validateExcept(PROPERTIES_PREFIX);
+
TableSchema tableSchema = context.getCatalogTable().getSchema();
validatePrimaryKey(tableSchema);
+ // add hbase properties
Review comment:
```suggestion
// add HBase properties
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]