huyuanfeng2018 commented on code in PR #5079:
URL: https://github.com/apache/inlong/pull/5079#discussion_r922988164
##########
inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/extract/KafkaExtractNode.java:
##########
@@ -86,14 +89,19 @@ public KafkaExtractNode(@JsonProperty("id") String id,
@Nonnull @JsonProperty("format") Format format,
@JsonProperty("scanStartupMode") KafkaScanStartupMode
kafkaScanStartupMode,
@JsonProperty("primaryKey") String primaryKey,
- @JsonProperty("groupId") String groupId) {
+ @JsonProperty("groupId") String groupId,
+ @JsonProperty("kafkaScanSpecificOffset") String
kafkaScanSpecificOffset) {
super(id, name, fields, watermarkField, properties);
this.topic = Preconditions.checkNotNull(topic, "kafka topic is empty");
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers,
"kafka bootstrapServers is empty");
this.format = Preconditions.checkNotNull(format, "kafka format is
empty");
this.kafkaScanStartupMode =
Preconditions.checkNotNull(kafkaScanStartupMode, "kafka scanStartupMode is
empty");
this.primaryKey = primaryKey;
this.groupId = groupId;
+ if (kafkaScanStartupMode == KafkaScanStartupMode.SPECIFIC_OFFSET) {
+ this.kafkaScanSpecificOffset
+ = Preconditions.checkNotNull(kafkaScanSpecificOffset,
"kafkaScanSpecificOffset is empty");
Review Comment:
Preconditions currently use the Guava library. There is no checknotEmpty
method, so I can only change it like this:
```
Preconditions.checkArgument(StringUtils.isNotEmpty(scanSpecificOffsets),
"scanSpecificOffsets is empty");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]