leonardBang commented on a change in pull request #14536:
URL: https://github.com/apache/flink/pull/14536#discussion_r554998587



##########
File path: 
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
##########
@@ -104,10 +106,14 @@
                                     + "Can be set to '0' to disable it. Note, 
both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
                                     + "can be set to '0' with the flush 
interval set allowing for complete async processing of buffered actions.");
 
+    // Prefix for Hbase specific properties.

Review comment:
       ```suggestion
       // Prefix for HBase specific properties.
   ```

##########
File path: 
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1TableFactory.java
##########
@@ -88,7 +90,14 @@
         descriptorProperties
                 .getOptionalString(CONNECTOR_ZK_NODE_PARENT)
                 .ifPresent(v -> 
hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
-
+        // add hbase properties

Review comment:
       ```suggestion
           // add HBase properties
   ```

##########
File path: 
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2TableFactory.java
##########
@@ -80,6 +81,15 @@
         final DescriptorProperties descriptorProperties = 
getValidatedProperties(properties);
         Configuration hbaseClientConf = getHConf(descriptorProperties);
 
+        // add hbase properties

Review comment:
       ```suggestion
           // add HBase properties
   ```

##########
File path: 
flink-connectors/flink-connector-hbase-base/src/main/java/org/apache/flink/connector/hbase/source/AbstractHBaseDynamicTableSource.java
##########
@@ -114,4 +117,29 @@ public String asSummaryString() {
     public HBaseTableSchema getHBaseTableSchema() {
         return this.hbaseSchema;
     }
+
+    // get hbase table properties which start with prefix

Review comment:
       ```suggestion
       // get HBase table properties which start with prefix
   ```

##########
File path: 
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
##########
@@ -118,6 +124,10 @@ public DynamicTableSource createDynamicTableSource(Context 
context) {
         hbaseClientConf.set(HConstants.ZOOKEEPER_QUORUM, 
helper.getOptions().get(ZOOKEEPER_QUORUM));
         hbaseClientConf.set(
                 HConstants.ZOOKEEPER_ZNODE_PARENT, 
helper.getOptions().get(ZOOKEEPER_ZNODE_PARENT));
+        // add hbase properties

Review comment:
       ```suggestion
           // add HBase properties
   ```

##########
File path: 
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1TableFactory.java
##########
@@ -106,6 +115,16 @@
         descriptorProperties
                 .getOptionalString(CONNECTOR_ZK_NODE_PARENT)
                 .ifPresent(hbaseOptionsBuilder::setZkNodeParent);
+        Properties hbaseProperties = new Properties();
+        // add hbase properties

Review comment:
       ```suggestion
           // add HBase properties
   ```

##########
File path: 
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
##########
@@ -103,31 +105,46 @@
                                     + "Can be set to '0' to disable it. Note, 
both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
                                     + "can be set to '0' with the flush 
interval set allowing for complete async processing of buffered actions.");
 
+    // Prefix for Hbase specific properties.

Review comment:
       ```suggestion
       // Prefix for HBase specific properties.
   ```

##########
File path: 
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2TableFactory.java
##########
@@ -96,6 +106,16 @@
                 
TableSchemaUtils.getPhysicalSchema(descriptorProperties.getTableSchema(SCHEMA));
         HBaseTableSchema hbaseSchema = validateTableSchema(tableSchema);
 
+        Configuration hbaseClientConf = getHConf(descriptorProperties);
+        // add hbase properties

Review comment:
       ```suggestion
           // add HBase properties
   ```

##########
File path: 
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/sink/HBaseUpsertTableSink.java
##########
@@ -96,6 +96,13 @@ public TableSchema getTableSchema() {
         hbaseOptions
                 .getZkNodeParent()
                 .ifPresent(v -> 
hbaseClientConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, v));
+        // add hbase properties

Review comment:
       ```suggestion
           // add HBase properties
   ```

##########
File path: 
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1TableFactory.java
##########
@@ -200,6 +219,9 @@ private DescriptorProperties 
getValidatedProperties(Map<String, String> properti
         properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
         properties.add(SCHEMA + "." + 
DescriptorProperties.PRIMARY_KEY_COLUMNS);
 
+        // hbase properties

Review comment:
       ```suggestion
           // HBase properties
   ```

##########
File path: 
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java
##########
@@ -103,31 +105,46 @@
                                     + "Can be set to '0' to disable it. Note, 
both 'sink.buffer-flush.max-size' and 'sink.buffer-flush.max-rows' "
                                     + "can be set to '0' with the flush 
interval set allowing for complete async processing of buffered actions.");
 
+    // Prefix for Hbase specific properties.
+    public static final String PROPERTIES_PREFIX = "properties.";
+
     @Override
     public DynamicTableSource createDynamicTableSource(Context context) {
         TableFactoryHelper helper = createTableFactoryHelper(this, context);
-        helper.validate();
+        helper.validateExcept(PROPERTIES_PREFIX);
+
         TableSchema tableSchema = context.getCatalogTable().getSchema();
         validatePrimaryKey(tableSchema);
 
         String hTableName = helper.getOptions().get(TABLE_NAME);
 
+        Configuration hbaseConf = getHbaseConf(helper);
+        // add hbase properties
+        final Properties properties =
+                getSpecifyProperties(context.getCatalogTable().getOptions(), 
PROPERTIES_PREFIX);
+        properties.forEach((k, v) -> hbaseConf.set(k.toString(), 
v.toString()));
+
         String nullStringLiteral = 
helper.getOptions().get(NULL_STRING_LITERAL);
         HBaseTableSchema hbaseSchema = 
HBaseTableSchema.fromTableSchema(tableSchema);
 
-        return new HBaseDynamicTableSource(
-                getHbaseConf(helper), hTableName, hbaseSchema, 
nullStringLiteral);
+        return new HBaseDynamicTableSource(hbaseConf, hTableName, hbaseSchema, 
nullStringLiteral);
     }
 
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
         TableFactoryHelper helper = createTableFactoryHelper(this, context);
-        helper.validate();
+        helper.validateExcept(PROPERTIES_PREFIX);
         TableSchema tableSchema = context.getCatalogTable().getSchema();
         validatePrimaryKey(tableSchema);
 
         String hTableName = helper.getOptions().get(TABLE_NAME);
 
+        Configuration hbaseConf = getHbaseConf(helper);
+        // add hbase properties

Review comment:
       ```suggestion
           // add HBase properties
   ```

##########
File path: 
flink-connectors/flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2TableFactory.java
##########
@@ -188,6 +208,9 @@ private DescriptorProperties 
getValidatedProperties(Map<String, String> properti
         properties.add(SCHEMA + "." + DescriptorProperties.PRIMARY_KEY_NAME);
         properties.add(SCHEMA + "." + 
DescriptorProperties.PRIMARY_KEY_COLUMNS);
 
+        // hbase properties

Review comment:
       ```suggestion
           // HBase properties
   ```

##########
File path: 
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/options/HBaseOptions.java
##########
@@ -120,11 +136,18 @@ public Builder setZkNodeParent(String zkNodeParent) {
             return this;
         }
 
+        /** Optional. Sets hbase properties for hbase configuration. */

Review comment:
       ```suggestion
           /** Optional. Sets HBase properties for hbase configuration. */
   ```

##########
File path: 
flink-connectors/flink-connector-hbase-1.4/src/main/java/org/apache/flink/connector/hbase1/HBase1DynamicTableFactory.java
##########
@@ -128,14 +138,20 @@ public DynamicTableSource 
createDynamicTableSource(Context context) {
     @Override
     public DynamicTableSink createDynamicTableSink(Context context) {
         TableFactoryHelper helper = createTableFactoryHelper(this, context);
-        helper.validate();
+        helper.validateExcept(PROPERTIES_PREFIX);
+
         TableSchema tableSchema = context.getCatalogTable().getSchema();
         validatePrimaryKey(tableSchema);
 
+        // add hbase properties

Review comment:
       ```suggestion
           // add HBase properties
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to