This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-1.1
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-1.1 by this push:
     new 4a387c0715 [#9305] Improvement(lineage): in 
LineageSinkManager:capacityPerSink has the possibility to be set to zero. 
(#9433)
4a387c0715 is described below

commit 4a387c071562ce8fe5a1885d9ceae432df8f894d
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Dec 10 11:07:56 2025 +0800

    [#9305] Improvement(lineage): in LineageSinkManager:capacityPerSink has the 
possibility to be set to zero. (#9433)
    
    ### What changes were proposed in this pull request?
    
    This PR adds a default value of 1 for integer divisons that result in 0
    at the level generateEventListenerConfigs methods.
    
    the fix uses parseInt to parse into an int primitive instead of Integer
    object since were storing into int, and Math.max(1, totalCapacity /
    sinks.size()) to guarantee a minimum queue capacitypersink of 1.
    
    ### Why are the changes needed?
    
    Previously, integer division operations in generateEventListenerConfigs
    could result in a value of 0 capacityPerSink, when the total capacity is
    less than the number of sinks. this violates EventListenerConfig
    explicit rule that enforces a capacity being larger than 0. which leads
    to an initialization failure when the configuration is loaded.
    
    Fix: #9305
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is purely internal
    
    ### How was this patch tested?
    
    Added unit test and ran the following:
    ./gradlew :common:compileJava
    ./gradlew :common:test
    ./gradlew :lineage:test
    
    Co-authored-by: Salmane Khalili <[email protected]>
---
 .../gravitino/lineage/sink/LineageSinkManager.java |  5 +++-
 .../lineage/sink/TestLineageSinkManager.java       | 34 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)

diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
index 19d07c4a93..ea30fb1029 100644
--- 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
+++ 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
@@ -79,7 +79,10 @@ public class LineageSinkManager implements Closeable {
     String queueCapacity = 
lineageConfigs.get(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY);
     Preconditions.checkArgument(
         StringUtils.isNotBlank(queueCapacity), "Lineage sink queue capacity is 
not set");
-    int capacityPerSink = Integer.valueOf(queueCapacity) / sinks.size();
+
+    int totalCapacity = Integer.parseInt(queueCapacity);
+    Preconditions.checkArgument(totalCapacity > 0, "Lineage sink queue 
capacity must be positive");
+    int capacityPerSink = Math.max(1, totalCapacity / sinks.size());
 
     eventListenerConfigs.put(
         EventListenerManager.GRAVITINO_EVENT_LISTENER_NAMES, String.join(",", 
sinks));
diff --git 
a/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
 
b/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
index 6ccf6ac5d2..26918eaf26 100644
--- 
a/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
+++ 
b/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
@@ -91,6 +91,40 @@ public class TestLineageSinkManager {
     Assertions.assertEquals("b", configs.get("sink2.a"));
   }
 
+  @Test
+  public void testGenerateEventListenerConfigsEnforcesMinimumQueueCapacity() {
+    // Test case where integer division would result in 0 (2 / 3 = 0)
+    List<String> sinks = Arrays.asList("sinkA", "sinkB", "sinkC");
+    Map<String, String> lineageConfigs = new HashMap<>();
+    lineageConfigs.put(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY, "2");
+
+    Map<String, String> eventListenerConfigs =
+        LineageSinkManager.transformToEventListenerConfigs(sinks, 
lineageConfigs);
+
+    sinks.forEach(
+        sink -> {
+          String capacityKey =
+              sink + "." + 
EventListenerManager.GRAVITINO_EVENT_LISTENER_QUEUE_CAPACITY;
+          String actualCapacity = eventListenerConfigs.get(capacityKey);
+
+          Assertions.assertEquals(
+              "1",
+              actualCapacity,
+              String.format("Queue capacity for %s should be at least 1", 
sink));
+        });
+  }
+
+  @Test
+  public void testInvalidQueueCapacityThrowsException() {
+    List<String> sinks = Arrays.asList("sinkA", "sinkB");
+    Map<String, String> lineageConfigs = new HashMap<>();
+    lineageConfigs.put(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY, "0");
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> LineageSinkManager.transformToEventListenerConfigs(sinks, 
lineageConfigs));
+  }
+
   private void checkLineageSink(LineageSinkForTest sink) {
     Map<String, String> configs = sink.getConfigs();
     Assertions.assertTrue(configs.containsKey("name"));

Reply via email to