This is an automated email from the ASF dual-hosted git repository.
rabreu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new af4699946 STORM-4154 - fix: Nimbus downtime during topology deployment
(#3833)
af4699946 is described below
commit af4699946b1d2f599e5e08b5d3005aff0ee74a79
Author: Diogo Filipe Pinto Pereira <[email protected]>
AuthorDate: Tue Jan 14 16:50:04 2025 +0000
STORM-4154 - fix: Nimbus downtime during topology deployment (#3833)
* fix: Nimbus downtime during topology deployment
* add tests for method
---
.../org/apache/storm/daemon/nimbus/Nimbus.java | 14 ++--
.../org/apache/storm/daemon/nimbus/NimbusTest.java | 90 +++++++++++++++++++++-
2 files changed, 97 insertions(+), 7 deletions(-)
diff --git
a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 74610cbe9..18c5e5bdf 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -308,7 +308,7 @@ public class Nimbus implements Iface, Shutdownable,
DaemonCommon {
NIMBUS_SUBJECT.getPrincipals().add(new NimbusPrincipal());
NIMBUS_SUBJECT.setReadOnly();
}
-
+
private static final TopologyStateTransition NOOP_TRANSITION = (arg,
nimbus, topoId, base) -> null;
private static final TopologyStateTransition INACTIVE_TRANSITION = (arg,
nimbus, topoId, base) -> Nimbus.make(TopologyStatus.INACTIVE);
private static final TopologyStateTransition ACTIVE_TRANSITION = (arg,
nimbus, topoId, base) -> Nimbus.make(TopologyStatus.ACTIVE);
@@ -3218,7 +3218,7 @@ public class Nimbus implements Iface, Shutdownable,
DaemonCommon {
ret.allComponents = new HashSet<>(ret.taskToComponent.values());
return ret;
}
-
+
@VisibleForTesting
public boolean awaitLeadership(long timeout, TimeUnit timeUnit) throws
InterruptedException {
return leaderElector.awaitLeadership(timeout, timeUnit);
@@ -4109,6 +4109,8 @@ public class Nimbus implements Iface, Shutdownable,
DaemonCommon {
state.setupBlob(key, ni, getVersionForKey(key, ni, zkClient));
}
LOG.debug("Created state in zookeeper {} {} {}", state, store, ni);
+ } catch (KeyNotFoundException e) {
+ LOG.warn("Key not found while creating state in zookeeper - key: "
+ key, e);
} catch (Exception e) {
LOG.warn("Exception while creating state in zookeeper - key: " +
key, e);
if (e instanceof TException) {
@@ -5313,7 +5315,7 @@ public class Nimbus implements Iface, Shutdownable,
DaemonCommon {
private static class ClusterSummaryMetrics implements MetricSet {
private static final String SUMMARY = "summary";
private final Map<String, com.codahale.metrics.Metric> metrics = new
HashMap<>();
-
+
public com.codahale.metrics.Metric put(String key,
com.codahale.metrics.Metric value) {
return metrics.put(MetricRegistry.name(SUMMARY, key), value);
}
@@ -5323,12 +5325,12 @@ public class Nimbus implements Iface, Shutdownable,
DaemonCommon {
return metrics;
}
}
-
+
private class ClusterSummaryMetricSet implements Runnable {
private static final int CACHING_WINDOW = 5;
-
+
private final ClusterSummaryMetrics clusterSummaryMetrics = new
ClusterSummaryMetrics();
-
+
private final Function<String, Histogram> registerHistogram = (name)
-> {
//This histogram reflects the data distribution across only one
ClusterSummary, i.e.,
// data distribution across all entities of a type (e.g., data
from all nimbus/topologies) at one moment.
diff --git
a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
index 35010fe18..792ec3aa1 100644
--- a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
+++ b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/NimbusTest.java
@@ -25,24 +25,75 @@ import java.util.Set;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
+import org.apache.storm.blobstore.BlobStore;
+import org.apache.storm.blobstore.KeySequenceNumber;
+import org.apache.storm.blobstore.LocalFsBlobStore;
+import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.generated.KeyNotFoundException;
import org.apache.storm.generated.StormTopology;
+import org.apache.storm.metric.StormMetricsRegistry;
+import org.apache.storm.nimbus.ILeaderElector;
+import org.apache.storm.nimbus.NimbusInfo;
+import org.apache.storm.scheduler.INimbus;
import
org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy;
import
org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
import
org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategyOld;
import
org.apache.storm.scheduler.resource.strategies.scheduling.RoundRobinResourceAwareStrategy;
+import org.apache.storm.security.auth.IGroupMappingServiceProvider;
import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.thrift.TException;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.ServerUtils;
import org.apache.storm.utils.Time;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.MockitoAnnotations;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class NimbusTest {
+ private static final String BLOB_FILE_KEY = "file-key";
+
+ @Mock
+ private StormMetricsRegistry metricRegistry;
+ @Mock
+ private INimbus iNimbus;
+ @Mock
+ private IStormClusterState stormClusterState;
+ @Mock
+ private NimbusInfo nimbusInfo;
+ @Mock
+ private LocalFsBlobStore localBlobStore;
+ @Mock
+ private ILeaderElector leaderElector;
+ @Mock
+ private IGroupMappingServiceProvider groupMapper;
+
+ private Nimbus nimbus;
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ MockitoAnnotations.openMocks(this).close();
+
+ Map<String, Object> conf =
Map.of(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS, 10);
+ nimbus = new Nimbus(conf, iNimbus, stormClusterState, nimbusInfo,
localBlobStore, leaderElector, groupMapper, metricRegistry);
+ }
-public class NimbusTest {
@Test
public void testMemoryLoadLargerThanMaxHeapSize() {
// Topology will not be able to be successfully scheduled: Config
TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB=128.0 < 129.0,
@@ -112,4 +163,41 @@ public class NimbusTest {
Map<String, Object> normalized = Nimbus.normalizeConf(conf, topoConf,
topology);
assertNull(normalized.get(Config.STORM_WORKERS_ARTIFACTS_DIR));
}
+
+ @Test
+ void testCreateStateInZookeeper() throws TException {
+ nimbus.createStateInZookeeper(BLOB_FILE_KEY);
+
+ verify(stormClusterState).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo),
any());
+ }
+
+ @Test
+ void
testCreateStateInZookeeperWithoutLocalFsBlobStoreInstanceShouldNotCreate()
throws Exception {
+ BlobStore blobStore = mock(BlobStore.class);
+ Map<String, Object> conf =
Map.of(DaemonConfig.NIMBUS_MONITOR_FREQ_SECS, 10);
+ nimbus = new Nimbus(conf, iNimbus, stormClusterState, nimbusInfo,
blobStore, leaderElector, groupMapper, metricRegistry);
+
+ nimbus.createStateInZookeeper(BLOB_FILE_KEY);
+
+ verify(stormClusterState, never()).setupBlob(eq(BLOB_FILE_KEY),
eq(nimbusInfo), any());
+ }
+
+ @Test
+ void
testCreateStateInZookeeperWhenFailToSetupBlobWithRuntimeExceptionThrowsRuntimeException()
{
+ doThrow(new RuntimeException("Failed to setup
blob")).when(stormClusterState).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo),
any());
+
+ assertThrows(RuntimeException.class, () ->
nimbus.createStateInZookeeper(BLOB_FILE_KEY));
+ verify(stormClusterState).setupBlob(eq(BLOB_FILE_KEY), eq(nimbusInfo),
any());
+ }
+
+ @Test
+ void testCreateStateInZookeeperWhenKeyNotFoundHandlesException() throws
Exception {
+ try (MockedConstruction<KeySequenceNumber> keySequenceNumber =
mockConstruction(KeySequenceNumber.class, (mock, context) ->
+ when(mock.getKeySequenceNumber(any())).thenThrow(new
KeyNotFoundException("Failed to setup blob")))) {
+ nimbus.createStateInZookeeper(BLOB_FILE_KEY);
+
+
verify(keySequenceNumber.constructed().get(0)).getKeySequenceNumber(any());
+ verify(stormClusterState, never()).setupBlob(eq(BLOB_FILE_KEY),
eq(nimbusInfo), any());
+ }
+ }
}