This is an automated email from the ASF dual-hosted git repository.

rabreu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new af4699946 STORM-4154 - fix: Nimbus downtime during topology deployment 
(#3833)
af4699946 is described below

commit af4699946b1d2f599e5e08b5d3005aff0ee74a79
Author: Diogo Filipe Pinto Pereira <[email protected]>
AuthorDate: Tue Jan 14 16:50:04 2025 +0000

    STORM-4154 - fix: Nimbus downtime during topology deployment (#3833)
    
    * fix: Nimbus downtime during topology deployment
    
    * add tests for method
---
 .../org/apache/storm/daemon/nimbus/Nimbus.java     | 14 ++--
 .../org/apache/storm/daemon/nimbus/NimbusTest.java | 90 +++++++++++++++++++++-
 2 files changed, 97 insertions(+), 7 deletions(-)

diff --git 
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java 
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 74610cbe9..18c5e5bdf 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -308,7 +308,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal());
         NIMBUS_SUBJECT.setReadOnly();
     }
-    
+
     private static final TopologyStateTransition NOOP_TRANSITION = (arg, 
nimbus, topoId, base) -> null;
     private static final TopologyStateTransition INACTIVE_TRANSITION = (arg, 
nimbus, topoId, base) -> Nimbus.make(TopologyStatus.INACTIVE);
     private static final TopologyStateTransition ACTIVE_TRANSITION = (arg, 
nimbus, topoId, base) -> Nimbus.make(TopologyStatus.ACTIVE);
@@ -3218,7 +3218,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
         ret.allComponents = new HashSet<>(ret.taskToComponent.values());
         return ret;
     }
-    
+
     @VisibleForTesting
     public boolean awaitLeadership(long timeout, TimeUnit timeUnit) throws 
InterruptedException {
         return leaderElector.awaitLeadership(timeout, timeUnit);
@@ -4109,6 +4109,8 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
                 state.setupBlob(key, ni, getVersionForKey(key, ni, zkClient));
             }
             LOG.debug("Created state in zookeeper {} {} {}", state, store, ni);
+        } catch (KeyNotFoundException e) {
+            LOG.warn("Key not found while creating state in zookeeper - key: " 
+ key, e);
         } catch (Exception e) {
             LOG.warn("Exception while creating state in zookeeper - key: " + 
key, e);
             if (e instanceof TException) {
@@ -5313,7 +5315,7 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
     private static class ClusterSummaryMetrics implements MetricSet {
         private static final String SUMMARY = "summary";
         private final Map<String, com.codahale.metrics.Metric> metrics = new 
HashMap<>();
-        
+
         public com.codahale.metrics.Metric put(String key, 
com.codahale.metrics.Metric value) {
             return metrics.put(MetricRegistry.name(SUMMARY, key), value);
         }
@@ -5323,12 +5325,12 @@ public class Nimbus implements Iface, Shutdownable, 
DaemonCommon {
             return metrics;
         }
     }
-    
+
     private class ClusterSummaryMetricSet implements Runnable {
         private static final int CACHING_WINDOW = 5;
-        
+
         private final ClusterSummaryMetrics clusterSummaryMetrics = new 
ClusterSummaryMetrics();
-        
+
         private final Function<String, Histogram> registerHistogram = (name) 
-> {
             //This histogram reflects the data distribution across only one 
ClusterSummary, i.e.,
             // data distribution across all entities of a type (e.g., data 
from all nimbus/topologies) at one moment.
diff --git 
a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java 
b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
index 35010fe18..792ec3aa1 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
@@ -25,24 +25,75 @@ import java.util.Set;
 
 import org.apache.storm.Config;
 import org.apache.storm.DaemonConfig;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.KeySequenceNumber;
+import org.apache.storm.blobstore.LocalFsBlobStore;
+import org.apache.storm.cluster.IStormClusterState;
 import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.KeyNotFoundException;
 import org.apache.storm.generated.StormTopology;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.nimbus.ILeaderElector;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.scheduler.INimbus;
 import 
org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy;
 import 
org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
 import 
org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategyOld;
 import 
org.apache.storm.scheduler.resource.strategies.scheduling.RoundRobinResourceAwareStrategy;
+import org.apache.storm.security.auth.IGroupMappingServiceProvider;
 import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.thrift.TException;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.utils.ServerUtils;
 import org.apache.storm.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.MockitoAnnotations;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class NimbusTest {
+    private static final String BLOB_FILE_KEY = "file-key";
+
+    @Mock
+    private StormMetricsRegistry metricRegistry;
+    @Mock
+    private INimbus iNimbus;
+    @Mock
+    private IStormClusterState stormClusterState;
+    @Mock
+    private NimbusInfo nimbusInfo;
+    @Mock
+    private LocalFsBlobStore localBlobStore;
+    @Mock
+    private ILeaderElector leaderElector;
+    @Mock
+    private IGroupMappingServiceProvider groupMapper;
+
+    private Nimbus nimbus;
+
+    @BeforeEach
+    public void setUp() throws Exception {
+        MockitoAnnotations.openMocks(this).close();
+
+        Map<String, Object> conf = 
Map.of(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS, 10);
+        nimbus = new Nimbus(conf, iNimbus, stormClusterState, nimbusInfo, 
localBlobStore, leaderElector, groupMapper, metricRegistry);
+    }
 
-public class NimbusTest {
     @Test
     public void testMemoryLoadLargerThanMaxHeapSize() {
         // Topology will not be able to be successfully scheduled: Config 
TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=128.0 < 129.0,
@@ -112,4 +163,41 @@ public class NimbusTest {
         Map<String, Object> normalized = Nimbus.normalizeConf(conf, topoConf, 
topology);
         assertNull(normalized.get(Config.STORM_WORKERS_ARTIFACTS_DIR));
     }
+
+    @Test
+    void testCreateStateInZookeeper() throws TException {
+        nimbus.createStateInZookeeper(BLOB_FILE_KEY);
+
+        verify(stormClusterState).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), 
any());
+    }
+
+    @Test
+    void 
testCreateStateInZookeeperWithoutLocalFsBlobStoreInstanceShouldNotCreate() 
throws Exception {
+        BlobStore blobStore = mock(BlobStore.class);
+        Map<String, Object> conf = 
Map.of(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS, 10);
+        nimbus = new Nimbus(conf, iNimbus, stormClusterState, nimbusInfo, 
blobStore, leaderElector, groupMapper, metricRegistry);
+
+        nimbus.createStateInZookeeper(BLOB_FILE_KEY);
+
+        verify(stormClusterState, never()).setupBlob(eq(BLOB_FILE_KEY), 
eq(nimbusInfo), any());
+    }
+
+    @Test
+    void 
testCreateStateInZookeeperWhenFailToSetupBlobWithRuntimeExceptionThrowsRuntimeException()
 {
+        doThrow(new RuntimeException("Failed to setup 
blob")).when(stormClusterState).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), 
any());
+
+        assertThrows(RuntimeException.class, () -> 
nimbus.createStateInZookeeper(BLOB_FILE_KEY));
+        verify(stormClusterState).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo), 
any());
+    }
+
+    @Test
+    void testCreateStateInZookeeperWhenKeyNotFoundHandlesException() throws 
Exception {
+        try (MockedConstruction<KeySequenceNumber> keySequenceNumber = 
mockConstruction(KeySequenceNumber.class, (mock, context) ->
+                when(mock.getKeySequenceNumber(any())).thenThrow(new 
KeyNotFoundException("Failed to setup blob")))) {
+            nimbus.createStateInZookeeper(BLOB_FILE_KEY);
+
+            
verify(keySequenceNumber.constructed().get(0)).getKeySequenceNumber(any());
+            verify(stormClusterState, never()).setupBlob(eq(BLOB_FILE_KEY), 
eq(nimbusInfo), any());
+        }
+    }
 }

Reply via email to