Hisoka-X commented on code in PR #7169:
URL: https://github.com/apache/seatunnel/pull/7169#discussion_r1725019067


##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java:
##########
@@ -126,44 +132,23 @@ public void testHbaseSinkWithArray(TestContainer 
container)
         scanner.close();
     }
 
-    @TestTemplate
-    public void testHbaseSinkAssignCfSink(TestContainer container)
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "Currently SPARK/FLINK do not support multiple 
table write")

Review Comment:
   Spark supported now.



##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hbase-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hbase/HbaseIT.java:
##########
@@ -126,44 +132,23 @@ public void testHbaseSinkWithArray(TestContainer 
container)
         scanner.close();
     }
 
-    @TestTemplate
-    public void testHbaseSinkAssignCfSink(TestContainer container)

Review Comment:
   Why removed this test case?



##########
seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java:
##########
@@ -59,4 +71,50 @@ public OptionRule optionRule() {
                         HBASE_EXTRA_CONFIG)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        ReadonlyConfig readonlyConfig = context.getOptions();
+        CatalogTable catalogTable = context.getCatalogTable();
+        ReadonlyConfig finalReadonlyConfig =
+                generateCurrentReadonlyConfig(readonlyConfig, catalogTable);
+        HbaseParameters hbaseParameters = 
HbaseParameters.buildWithConfig(finalReadonlyConfig);
+        return () -> new HbaseSink(hbaseParameters, catalogTable);
+    }
+
+    private ReadonlyConfig generateCurrentReadonlyConfig(
+            ReadonlyConfig readonlyConfig, CatalogTable catalogTable) {
+        Map<String, String> configMap = readonlyConfig.toMap();
+
+        readonlyConfig
+                .getOptional(TABLE)
+                .ifPresent(
+                        tableName -> {
+                            String replacedPath =
+                                    replaceCatalogTableInPath(tableName, 
catalogTable);
+                            configMap.put(TABLE.key(), replacedPath);
+                        });
+
+        return ReadonlyConfig.fromMap(new HashMap<>(configMap));
+    }
+
+    private String replaceCatalogTableInPath(String originTableName, 
CatalogTable catalogTable) {
+        String tableName = originTableName;
+        TableIdentifier tableIdentifier = catalogTable.getTableId();
+        if (tableIdentifier != null) {
+            if (tableIdentifier.getSchemaName() != null) {
+                tableName =
+                        tableName.replace(
+                                
SinkReplaceNameConstant.REPLACE_SCHEMA_NAME_KEY,
+                                tableIdentifier.getSchemaName());
+            }
+            if (tableIdentifier.getTableName() != null) {
+                tableName =
+                        tableName.replace(
+                                SinkReplaceNameConstant.REPLACE_TABLE_NAME_KEY,
+                                tableIdentifier.getTableName());
+            }
+        }
+        return tableName;
+    }

Review Comment:
   We supported this feature in 
https://github.com/apache/seatunnel/blob/570bbb39a0de9deed60e4089823b80bc59ec3263/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java#L45



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to