EricJoy2048 commented on code in PR #6833:
URL: https://github.com/apache/seatunnel/pull/6833#discussion_r1600988952
##########
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java:
##########
@@ -41,19 +42,18 @@ public String factoryIdentifier() {
@Override
public OptionRule optionRule() {
return OptionRule.builder()
- .required(
- PaimonSinkConfig.WAREHOUSE,
- PaimonSinkConfig.DATABASE,
- PaimonSinkConfig.TABLE)
+ .required(PaimonConfig.WAREHOUSE, PaimonConfig.DATABASE,
PaimonConfig.TABLE)
.optional(
- PaimonSinkConfig.HDFS_SITE_PATH,
+ PaimonConfig.HDFS_SITE_PATH,
+ PaimonConfig.HADOOP_CONF,
+ PaimonConfig.HADOOP_CONF_PATH,
+ PaimonConfig.CATALOG_TYPE,
+ PaimonConfig.CATALOG_URI,
Review Comment:
Use `conditional` please.
##########
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java:
##########
@@ -52,13 +52,15 @@ public OptionRule optionRule() {
.required(PaimonConfig.WAREHOUSE, PaimonConfig.DATABASE,
PaimonConfig.TABLE)
.optional(
PaimonConfig.HDFS_SITE_PATH,
+ PaimonConfig.HADOOP_CONF,
+ PaimonConfig.HADOOP_CONF_PATH,
+ PaimonConfig.CATALOG_TYPE,
+ PaimonConfig.CATALOG_URI,
Review Comment:
Please use `conditional` because `PaimonConfig.CATALOG_URI` only need when
`PaimonConfig.CATALOG_TYPE` is hive.
##########
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java:
##########
@@ -124,4 +149,50 @@ public void testFakeCDCSinkPaimon(TestContainer container)
throws Exception {
});
});
}
+
+ @TestTemplate
+ public void testFakeCDCSinkPaimonWithHiveCatalog(TestContainer container)
throws Exception {
+ Container.ExecResult execResult =
+
container.executeJob("/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+
+ given().ignoreExceptions()
+ .await()
+ .atLeast(200L, TimeUnit.MILLISECONDS)
+ .atMost(40L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+ PaimonSinkConfig paimonSinkConfig =
+ new PaimonSinkConfig(
+
ReadonlyConfig.fromMap(PAIMON_SINK_PROPERTIES));
+ PaimonCatalogLoader paimonCatalogLoader =
+ new PaimonCatalogLoader(paimonSinkConfig);
+ Catalog catalog =
paimonCatalogLoader.loadCatalog();
+ Table table =
+ catalog.getTable(
+
Identifier.create("seatunnel_namespace1", "st_test"));
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ TableRead tableRead = readBuilder.newRead();
+ List<PaimonRecord> paimonRecords = new
ArrayList<>();
+ try (RecordReader<InternalRow> reader =
tableRead.createReader(plan)) {
+ reader.forEachRemaining(
+ row ->
+ paimonRecords.add(
+ new PaimonRecord(
+ row.getLong(0),
+
row.getString(1).toString())));
+ }
+ Assertions.assertEquals(2, paimonRecords.size());
+ paimonRecords.forEach(
+ paimonRecord -> {
+ if (paimonRecord.getPkId() == 1) {
Review Comment:
For seatunnel, e2e is the only way to ensure that the code is functioning
properly. Please check all supported data types for Paimon in e2e. You can
construct these types through FakeSource and then check if the types of each
field in Paimon match your expectations. Finally, you also need to check
whether the values of each field in each row of data meet expectations.
##########
seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java:
##########
@@ -50,11 +54,23 @@ public class PaimonConfig implements Serializable {
.noDefaultValue()
.withDescription("The warehouse path of paimon");
+ public static final Option<String> CATALOG_TYPE =
+ Options.key("catalog_type")
+ .stringType()
Review Comment:
`.enumType(PaimonCatalogEnum.class)` is better?
##########
seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java:
##########
@@ -139,6 +140,14 @@ public static SeaTunnelRuntimeException
convertToConnectorTypeError(
return new SeaTunnelRuntimeException(CONVERT_TO_CONNECTOR_TYPE_ERROR,
params);
}
+ public static SeaTunnelRuntimeException convertToConnectorPropsBlankError(
+ String connector, String props) {
+ Map<String, String> params = new HashMap<>();
+ params.put("connector", connector);
Review Comment:
Do we need let user know `Source` Or `Sink` is error?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]