Copilot commented on code in PR #9412:
URL: https://github.com/apache/gravitino/pull/9412#discussion_r2601817926
##########
lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java:
##########
@@ -91,6 +91,29 @@ void testTransformToEventListenerConfigs() {
Assertions.assertEquals("b", configs.get("sink2.a"));
}
+ @Test
+ public void testGenerateEventListenerConfigsEnforcesMinimumQueueCapacity() {
+ // Test case where integer division would result in 0 (2 / 3 = 0)
+ List<String> sinks = Arrays.asList("sinkA", "sinkB", "sinkC");
+ Map<String, String> lineageConfigs = new HashMap<>();
+ lineageConfigs.put(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY, "2");
+
+ Map<String, String> eventListenerConfigs =
+ LineageSinkManager.transformToEventListenerConfigs(sinks,
lineageConfigs);
+
+ sinks.forEach(
+ sink -> {
+ String capacityKey =
+ sink + "." +
EventListenerManager.GRAVITINO_EVENT_LISTENER_QUEUE_CAPACITY;
+ String actualCapacity = eventListenerConfigs.get(capacityKey);
+
+ Assertions.assertEquals(
+ "1",
+ actualCapacity,
+ String.format("Queue capacity for %s should be at least 1",
sink));
+ });
+ }
Review Comment:
Consider adding a test case for when `totalCapacity` is 0 or negative to
ensure the code handles invalid configurations appropriately. This would verify
whether the system fails-fast with a clear error message or silently accepts
the invalid value.
Example test case:
```java
@Test
public void testInvalidQueueCapacityThrowsException() {
List<String> sinks = Arrays.asList("sinkA", "sinkB");
Map<String, String> lineageConfigs = new HashMap<>();
lineageConfigs.put(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY, "0");
Assertions.assertThrows(
IllegalArgumentException.class,
() -> LineageSinkManager.transformToEventListenerConfigs(sinks,
lineageConfigs));
}
```
##########
lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java:
##########
@@ -79,7 +79,9 @@ private static Map<String, String>
generateEventListenerConfigs(
String queueCapacity =
lineageConfigs.get(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY);
Preconditions.checkArgument(
StringUtils.isNotBlank(queueCapacity), "Lineage sink queue capacity is
not set");
- int capacityPerSink = Integer.valueOf(queueCapacity) / sinks.size();
+
+ int totalCapacity = Integer.parseInt(queueCapacity);
Review Comment:
Consider adding validation to ensure `totalCapacity` is positive before the
division. If a user configures `LINEAGE_SINK_QUEUE_CAPACITY` to be 0 or
negative, the current code would set `capacityPerSink` to 1 (due to
`Math.max`), which may silently hide a configuration error.
Suggestion:
```java
int totalCapacity = Integer.parseInt(queueCapacity);
Preconditions.checkArgument(totalCapacity > 0, "Lineage sink queue capacity
must be positive");
int capacityPerSink = Math.max(1, totalCapacity / sinks.size());
```
This would fail-fast with a clear error message rather than silently
accepting invalid configurations.
```suggestion
int totalCapacity = Integer.parseInt(queueCapacity);
Preconditions.checkArgument(totalCapacity > 0, "Lineage sink queue
capacity must be positive");
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]