This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-1.1
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-1.1 by this push:
new 4a387c0715 [#9305] Improvement(lineage): in
LineageSinkManager:capacityPerSink has the possibility to be set to zero.
(#9433)
4a387c0715 is described below
commit 4a387c071562ce8fe5a1885d9ceae432df8f894d
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 10 11:07:56 2025 +0800
[#9305] Improvement(lineage): in LineageSinkManager:capacityPerSink has the
possibility to be set to zero. (#9433)
### What changes were proposed in this pull request?
This PR adds a default value of 1 for integer divisons that result in 0
at the level generateEventListenerConfigs methods.
the fix uses parseInt to parse into an int primitive instead of Integer
object since were storing into int, and Math.max(1, totalCapacity /
sinks.size()) to guarantee a minimum queue capacitypersink of 1.
### Why are the changes needed?
Previously, integer division operations in generateEventListenerConfigs
could result in a value of 0 capacityPerSink, when the total capacity is
less than the number of sinks. this violates EventListenerConfig
explicit rule that enforces a capacity being larger than 0. which leads
to an initialization failure when the configuration is loaded.
Fix: #9305
### Does this PR introduce _any_ user-facing change?
No, this is purely internal
### How was this patch tested?
Added unit test and ran the following:
./gradlew :common:compileJava
./gradlew :common:test
./gradlew :lineage:test
Co-authored-by: Salmane Khalili <[email protected]>
---
.../gravitino/lineage/sink/LineageSinkManager.java | 5 +++-
.../lineage/sink/TestLineageSinkManager.java | 34 ++++++++++++++++++++++
2 files changed, 38 insertions(+), 1 deletion(-)
diff --git
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
index 19d07c4a93..ea30fb1029 100644
---
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
+++
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
@@ -79,7 +79,10 @@ public class LineageSinkManager implements Closeable {
String queueCapacity =
lineageConfigs.get(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY);
Preconditions.checkArgument(
StringUtils.isNotBlank(queueCapacity), "Lineage sink queue capacity is
not set");
- int capacityPerSink = Integer.valueOf(queueCapacity) / sinks.size();
+
+ int totalCapacity = Integer.parseInt(queueCapacity);
+ Preconditions.checkArgument(totalCapacity > 0, "Lineage sink queue
capacity must be positive");
+ int capacityPerSink = Math.max(1, totalCapacity / sinks.size());
eventListenerConfigs.put(
EventListenerManager.GRAVITINO_EVENT_LISTENER_NAMES, String.join(",",
sinks));
diff --git
a/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
b/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
index 6ccf6ac5d2..26918eaf26 100644
---
a/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
+++
b/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
@@ -91,6 +91,40 @@ public class TestLineageSinkManager {
Assertions.assertEquals("b", configs.get("sink2.a"));
}
+ @Test
+ public void testGenerateEventListenerConfigsEnforcesMinimumQueueCapacity() {
+ // Test case where integer division would result in 0 (2 / 3 = 0)
+ List<String> sinks = Arrays.asList("sinkA", "sinkB", "sinkC");
+ Map<String, String> lineageConfigs = new HashMap<>();
+ lineageConfigs.put(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY, "2");
+
+ Map<String, String> eventListenerConfigs =
+ LineageSinkManager.transformToEventListenerConfigs(sinks,
lineageConfigs);
+
+ sinks.forEach(
+ sink -> {
+ String capacityKey =
+ sink + "." +
EventListenerManager.GRAVITINO_EVENT_LISTENER_QUEUE_CAPACITY;
+ String actualCapacity = eventListenerConfigs.get(capacityKey);
+
+ Assertions.assertEquals(
+ "1",
+ actualCapacity,
+ String.format("Queue capacity for %s should be at least 1",
sink));
+ });
+ }
+
+ @Test
+ public void testInvalidQueueCapacityThrowsException() {
+ List<String> sinks = Arrays.asList("sinkA", "sinkB");
+ Map<String, String> lineageConfigs = new HashMap<>();
+ lineageConfigs.put(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY, "0");
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> LineageSinkManager.transformToEventListenerConfigs(sinks,
lineageConfigs));
+ }
+
private void checkLineageSink(LineageSinkForTest sink) {
Map<String, String> configs = sink.getConfigs();
Assertions.assertTrue(configs.containsKey("name"));