This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 4e7a40e5a9 [#9305] Improvement(lineage): in 
LineageSinkManager:capacityPerSink has the possibility to be set to zero. 
(#9412)
4e7a40e5a9 is described below

commit 4e7a40e5a9a9f6a2437d5133bef1d22b45ab65f6
Author: Salmane Khalili <[email protected]>
AuthorDate: Wed Dec 10 02:54:52 2025 +0100

    [#9305] Improvement(lineage): in LineageSinkManager:capacityPerSink has the 
possibility to be set to zero. (#9412)
    
    ### What changes were proposed in this pull request?
    
    This PR adds a default value of 1 for integer divisons that result in 0
    at the level generateEventListenerConfigs methods.
    
    the fix uses parseInt to parse into an int primitive instead of Integer
    object since were storing into int, and Math.max(1, totalCapacity /
    sinks.size()) to guarantee a minimum queue capacitypersink of 1.
    
    ### Why are the changes needed?
    
    Previously, integer division operations in generateEventListenerConfigs
    could result in a value of 0 capacityPerSink, when the total capacity is
    less than the number of sinks. this violates EventListenerConfig
    explicit rule that enforces a capacity being larger than 0. which leads
    to an initialization failure when the configuration is loaded.
    
    Fix: #9305
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, this is purely internal
    
    ### How was this patch tested?
    
    Added unit test and ran the following:
    ./gradlew :common:compileJava
    ./gradlew :common:test
    ./gradlew :lineage:test
---
 .../gravitino/lineage/sink/LineageSinkManager.java |  5 +++-
 .../lineage/sink/TestLineageSinkManager.java       | 34 ++++++++++++++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)

diff --git 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
index 19d07c4a93..ea30fb1029 100644
--- 
a/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
+++ 
b/lineage/src/main/java/org/apache/gravitino/lineage/sink/LineageSinkManager.java
@@ -79,7 +79,10 @@ public class LineageSinkManager implements Closeable {
     String queueCapacity = 
lineageConfigs.get(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY);
     Preconditions.checkArgument(
         StringUtils.isNotBlank(queueCapacity), "Lineage sink queue capacity is 
not set");
-    int capacityPerSink = Integer.valueOf(queueCapacity) / sinks.size();
+
+    int totalCapacity = Integer.parseInt(queueCapacity);
+    Preconditions.checkArgument(totalCapacity > 0, "Lineage sink queue 
capacity must be positive");
+    int capacityPerSink = Math.max(1, totalCapacity / sinks.size());
 
     eventListenerConfigs.put(
         EventListenerManager.GRAVITINO_EVENT_LISTENER_NAMES, String.join(",", 
sinks));
diff --git 
a/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
 
b/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
index 6ccf6ac5d2..26918eaf26 100644
--- 
a/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
+++ 
b/lineage/src/test/java/org/apache/gravitino/lineage/sink/TestLineageSinkManager.java
@@ -91,6 +91,40 @@ public class TestLineageSinkManager {
     Assertions.assertEquals("b", configs.get("sink2.a"));
   }
 
+  @Test
+  public void testGenerateEventListenerConfigsEnforcesMinimumQueueCapacity() {
+    // Test case where integer division would result in 0 (2 / 3 = 0)
+    List<String> sinks = Arrays.asList("sinkA", "sinkB", "sinkC");
+    Map<String, String> lineageConfigs = new HashMap<>();
+    lineageConfigs.put(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY, "2");
+
+    Map<String, String> eventListenerConfigs =
+        LineageSinkManager.transformToEventListenerConfigs(sinks, 
lineageConfigs);
+
+    sinks.forEach(
+        sink -> {
+          String capacityKey =
+              sink + "." + 
EventListenerManager.GRAVITINO_EVENT_LISTENER_QUEUE_CAPACITY;
+          String actualCapacity = eventListenerConfigs.get(capacityKey);
+
+          Assertions.assertEquals(
+              "1",
+              actualCapacity,
+              String.format("Queue capacity for %s should be at least 1", 
sink));
+        });
+  }
+
+  @Test
+  public void testInvalidQueueCapacityThrowsException() {
+    List<String> sinks = Arrays.asList("sinkA", "sinkB");
+    Map<String, String> lineageConfigs = new HashMap<>();
+    lineageConfigs.put(LineageConfig.LINEAGE_SINK_QUEUE_CAPACITY, "0");
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> LineageSinkManager.transformToEventListenerConfigs(sinks, 
lineageConfigs));
+  }
+
   private void checkLineageSink(LineageSinkForTest sink) {
     Map<String, String> configs = sink.getConfigs();
     Assertions.assertTrue(configs.containsKey("name"));

Reply via email to