shibd commented on code in PR #21675:
URL: https://github.com/apache/pulsar/pull/21675#discussion_r1418227509
##########
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSourceConfig.java:
##########
@@ -152,8 +153,13 @@ public static KafkaSourceConfig load(String yamlFile)
throws IOException {
return mapper.readValue(new File(yamlFile), KafkaSourceConfig.class);
}
- public static KafkaSourceConfig load(Map<String, Object> map) throws
IOException {
+ public static KafkaSourceConfig load(Map<String, Object> map,
SourceContext sourceContext) throws IOException {
ObjectMapper mapper = new ObjectMapper();
+ try {
+ map.put("sslTruststorePassword",
sourceContext.getSecret("sslTruststorePassword"));
+ } catch (Exception e) {
+ // ignore
+ }
mapper.enable(DeserializationFeature.ACCEPT_EMPTY_STRING_AS_NULL_OBJECT);
Review Comment:
This should be the default behavior for `IOConfigUtils.loadWithSecrets`. You
can add this configuration to IOConfigUtils, and here, use IOConfigUtils.
##########
pulsar-io/influxdb/src/test/java/org/apache/pulsar/io/influxdb/v1/InfluxDBSinkConfigTest.java:
##########
@@ -85,12 +123,13 @@ public final void validValidateTest() throws IOException {
map.put("batchTimeMs", "1000");
map.put("batchSize", "100");
- InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ InfluxDBSinkConfig config = InfluxDBSinkConfig.load(map, sinkContext);
config.validate();
}
- @Test(expectedExceptions = NullPointerException.class,
- expectedExceptionsMessageRegExp = "influxdbUrl property not set.")
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "influxdbUrl cannot be null")
Review Comment:
We can remove the original validation logic:
https://github.com/apache/pulsar/pull/21675/files#diff-3e4c907ca693ce293bc5e96a62e520e4c14d73e70fae8775ae3ea812f16db5fdR113
##########
pulsar-io/common/src/main/java/org/apache/pulsar/io/common/IOConfigUtils.java:
##########
@@ -77,13 +77,14 @@ private static <T> T loadWithSecrets(Map<String, Object>
map, Class<T> clazz,
}
}
configs.computeIfAbsent(field.getName(), key -> {
- if (fieldDoc.required()) {
Review Comment:
It's better to add a unit test to cover this change.
##########
pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkConfigTest.java:
##########
@@ -44,12 +66,13 @@ public void testLoadMapConfig() throws IOException {
}
@Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "Required MongoDB URI is not
set.")
+ expectedExceptionsMessageRegExp = "mongoUri cannot be null")
Review Comment:
Need to clean up the original validate logic.
##########
pulsar-io/redis/src/test/java/org/apache/pulsar/io/redis/sink/RedisSinkConfigTest.java:
##########
@@ -86,12 +115,13 @@ public final void validValidateTest() throws IOException {
map.put("batchTimeMs", "1000");
map.put("connectTimeout", "3000");
- RedisSinkConfig config = RedisSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ RedisSinkConfig config = RedisSinkConfig.load(map, sinkContext);
config.validate();
}
- @Test(expectedExceptions = NullPointerException.class,
- expectedExceptionsMessageRegExp = "redisHosts property not set.")
+ @Test(expectedExceptions = IllegalArgumentException.class,
Review Comment:
Need to clean up the original validate logic.
##########
pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java:
##########
@@ -105,12 +145,13 @@ public final void validValidateTest() throws IOException {
map.put("exchangeName", "test-exchange");
map.put("exchangeType", "test-exchange-type");
- RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+ SinkContext sinkContext = Mockito.mock(SinkContext.class);
+ RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map, sinkContext);
config.validate();
}
- @Test(expectedExceptions = NullPointerException.class,
- expectedExceptionsMessageRegExp = "exchangeName property not set.")
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "exchangeName cannot be null")
Review Comment:
Need to clean up the original validate logic.
##########
pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java:
##########
@@ -193,12 +193,12 @@ public void fatal(Throwable t) {
sink.close();
}
};
- expectThrows(NullPointerException.class, "Kafka topic is not set",
openAndClose);
- config.put("topic", "topic_2");
- expectThrows(NullPointerException.class, "Kafka bootstrapServers is
not set", openAndClose);
+ expectThrows(IllegalArgumentException.class, "bootstrapServers cannot
be null", openAndClose);
config.put("bootstrapServers", "localhost:6667");
- expectThrows(NullPointerException.class, "Kafka acks mode is not set",
openAndClose);
Review Comment:
Need to clean up the original validate logic.
##########
pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceConfigTest.java:
##########
@@ -43,12 +65,13 @@ public void testLoadMapConfig() throws IOException {
}
@Test(expectedExceptions = IllegalArgumentException.class,
- expectedExceptionsMessageRegExp = "Required MongoDB URI is not
set.")
+ expectedExceptionsMessageRegExp = "mongoUri cannot be null")
Review Comment:
Need to clean up the original validate logic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]