This is an automated email from the ASF dual-hosted git repository.
pbacsko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/yunikorn-core.git
The following commit(s) were added to refs/heads/master by this push:
new cce12571 [YUNIKORN-2681] Data race in TestCheckHealthStatusNotFound
(#925)
cce12571 is described below
commit cce12571516402e0cf410b44fd3aeec2e4c023c1
Author: Peter Bacsko <[email protected]>
AuthorDate: Thu Aug 8 18:57:07 2024 +0200
[YUNIKORN-2681] Data race in TestCheckHealthStatusNotFound (#925)
Closes: #925
Signed-off-by: Peter Bacsko <[email protected]>
---
pkg/webservice/handlers.go | 37 +++++++++++-----------
pkg/webservice/handlers_test.go | 69 +++++++++++++++++++++--------------------
pkg/webservice/state_dump.go | 2 +-
pkg/webservice/webservice.go | 5 +--
4 files changed, 58 insertions(+), 55 deletions(-)
diff --git a/pkg/webservice/handlers.go b/pkg/webservice/handlers.go
index 2b6145db..2d7d5081 100644
--- a/pkg/webservice/handlers.go
+++ b/pkg/webservice/handlers.go
@@ -127,7 +127,7 @@ func getStackInfo(w http.ResponseWriter, r *http.Request) {
func getClusterInfo(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
- lists := schedulerContext.GetPartitionMapClone()
+ lists := schedulerContext.Load().GetPartitionMapClone()
clustersInfo := getClusterDAO(lists)
if err := json.NewEncoder(w).Encode(clustersInfo); err != nil {
buildJSONErrorResponse(w, err.Error(),
http.StatusInternalServerError)
@@ -182,8 +182,9 @@ func buildJSONErrorResponse(w http.ResponseWriter, detail
string, code int) {
func getClusterJSON(partition *scheduler.PartitionContext) *dao.ClusterDAOInfo
{
clusterInfo := &dao.ClusterDAOInfo{}
- clusterInfo.StartTime = schedulerContext.GetStartTime().UnixNano()
- rmInfo := schedulerContext.GetRMInfoMapClone()
+ ctx := schedulerContext.Load()
+ clusterInfo.StartTime = ctx.GetStartTime().UnixNano()
+ rmInfo := ctx.GetRMInfoMapClone()
clusterInfo.RMBuildInformation = getRMBuildInformation(rmInfo)
clusterInfo.PartitionName =
common.GetPartitionNameWithoutClusterID(partition.Name)
clusterInfo.ClusterName = "kubernetes"
@@ -403,7 +404,7 @@ func getNodesDAO(entries []*objects.Node)
[]*dao.NodeDAOInfo {
// Deprecated - To be removed in next major release. Replaced with
getNodesUtilisations
func getNodeUtilisation(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
- partitionContext :=
schedulerContext.GetPartitionWithoutClusterID(configs.DefaultPartition)
+ partitionContext :=
schedulerContext.Load().GetPartitionWithoutClusterID(configs.DefaultPartition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists,
http.StatusInternalServerError)
return
@@ -468,7 +469,7 @@ func getNodesUtilJSON(partition
*scheduler.PartitionContext, name string) *dao.N
func getNodeUtilisations(w http.ResponseWriter, r *http.Request) {
writeHeaders(w)
var result []*dao.PartitionNodesUtilDAOInfo
- for _, part := range schedulerContext.GetPartitionMapClone() {
+ for _, part := range schedulerContext.Load().GetPartitionMapClone() {
result = append(result, getPartitionNodesUtilJSON(part))
}
@@ -599,7 +600,7 @@ func getClusterConfig(w http.ResponseWriter, r
*http.Request) {
func getClusterConfigDAO() *dao.ConfigDAOInfo {
// merge core config with extra config
conf := dao.ConfigDAOInfo{
- SchedulerConfig:
configs.ConfigContext.Get(schedulerContext.GetPolicyGroup()),
+ SchedulerConfig:
configs.ConfigContext.Get(schedulerContext.Load().GetPolicyGroup()),
Extra: configs.GetConfigMap(),
DeadlockDetectionEnabled: locking.IsTrackingEnabled(),
DeadlockTimeoutSeconds: locking.GetDeadlockTimeoutSeconds(),
@@ -612,7 +613,7 @@ func checkHealthStatus(w http.ResponseWriter, r
*http.Request) {
writeHeaders(w)
// Fetch last healthCheck result
- result := schedulerContext.GetLastHealthCheckResult()
+ result := schedulerContext.Load().GetLastHealthCheckResult()
if result != nil {
if !result.Healthy {
log.Log(log.SchedHealth).Error("Scheduler is not
healthy", zap.Any("health check info", *result))
@@ -634,7 +635,7 @@ func checkHealthStatus(w http.ResponseWriter, r
*http.Request) {
func getPartitions(w http.ResponseWriter, _ *http.Request) {
writeHeaders(w)
- lists := schedulerContext.GetPartitionMapClone()
+ lists := schedulerContext.Load().GetPartitionMapClone()
partitionsInfo := getPartitionInfoDAO(lists)
if err := json.NewEncoder(w).Encode(partitionsInfo); err != nil {
buildJSONErrorResponse(w, err.Error(),
http.StatusInternalServerError)
@@ -650,7 +651,7 @@ func getPartitionQueues(w http.ResponseWriter, r
*http.Request) {
}
partitionName := vars.ByName("partition")
var partitionQueuesDAOInfo dao.PartitionQueueDAOInfo
- var partition =
schedulerContext.GetPartitionWithoutClusterID(partitionName)
+ var partition =
schedulerContext.Load().GetPartitionWithoutClusterID(partitionName)
if partition != nil {
partitionQueuesDAOInfo = partition.GetPartitionQueues()
} else {
@@ -670,7 +671,7 @@ func getPartitionQueue(w http.ResponseWriter, r
*http.Request) {
return
}
partition := vars.ByName("partition")
- partitionContext :=
schedulerContext.GetPartitionWithoutClusterID(partition)
+ partitionContext :=
schedulerContext.Load().GetPartitionWithoutClusterID(partition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists,
http.StatusNotFound)
return
@@ -705,7 +706,7 @@ func getPartitionNodes(w http.ResponseWriter, r
*http.Request) {
return
}
partition := vars.ByName("partition")
- partitionContext :=
schedulerContext.GetPartitionWithoutClusterID(partition)
+ partitionContext :=
schedulerContext.Load().GetPartitionWithoutClusterID(partition)
if partitionContext != nil {
nodesDao := getNodesDAO(partitionContext.GetNodes())
if err := json.NewEncoder(w).Encode(nodesDao); err != nil {
@@ -724,7 +725,7 @@ func getPartitionNode(w http.ResponseWriter, r
*http.Request) {
return
}
partition := vars.ByName("partition")
- partitionContext :=
schedulerContext.GetPartitionWithoutClusterID(partition)
+ partitionContext :=
schedulerContext.Load().GetPartitionWithoutClusterID(partition)
if partitionContext != nil {
nodeID := vars.ByName("node")
node := partitionContext.GetNode(nodeID)
@@ -760,7 +761,7 @@ func getQueueApplications(w http.ResponseWriter, r
*http.Request) {
buildJSONErrorResponse(w, queueErr.Error(),
http.StatusBadRequest)
return
}
- partitionContext :=
schedulerContext.GetPartitionWithoutClusterID(partition)
+ partitionContext :=
schedulerContext.Load().GetPartitionWithoutClusterID(partition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists,
http.StatusNotFound)
return
@@ -792,7 +793,7 @@ func getPartitionApplicationsByState(w http.ResponseWriter,
r *http.Request) {
partition := vars.ByName("partition")
appState := strings.ToLower(vars.ByName("state"))
- partitionContext :=
schedulerContext.GetPartitionWithoutClusterID(partition)
+ partitionContext :=
schedulerContext.Load().GetPartitionWithoutClusterID(partition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists,
http.StatusNotFound)
return
@@ -846,7 +847,7 @@ func getApplication(w http.ResponseWriter, r *http.Request)
{
return
}
application := vars.ByName("application")
- partitionContext :=
schedulerContext.GetPartitionWithoutClusterID(partition)
+ partitionContext :=
schedulerContext.Load().GetPartitionWithoutClusterID(partition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists,
http.StatusNotFound)
return
@@ -887,7 +888,7 @@ func getPartitionRules(w http.ResponseWriter, r
*http.Request) {
return
}
partition := vars.ByName("partition")
- partitionContext :=
schedulerContext.GetPartitionWithoutClusterID(partition)
+ partitionContext :=
schedulerContext.Load().GetPartitionWithoutClusterID(partition)
if partitionContext == nil {
buildJSONErrorResponse(w, PartitionDoesNotExists,
http.StatusNotFound)
return
@@ -1209,7 +1210,7 @@ func getEvents(w http.ResponseWriter, r *http.Request) {
records, lowestID, highestID := eventSystem.GetEventsFromID(start,
count)
eventDao := dao.EventRecordDAO{
- InstanceUUID: schedulerContext.GetUUID(),
+ InstanceUUID: schedulerContext.Load().GetUUID(),
LowestID: lowestID,
HighestID: highestID,
EventRecords: records,
@@ -1266,7 +1267,7 @@ func getStream(w http.ResponseWriter, r *http.Request) {
defer eventSystem.RemoveStream(stream)
if err := enc.Encode(dao.YunikornID{
- InstanceUUID: schedulerContext.GetUUID(),
+ InstanceUUID: schedulerContext.Load().GetUUID(),
}); err != nil {
buildJSONErrorResponse(w, err.Error(),
http.StatusInternalServerError)
return
diff --git a/pkg/webservice/handlers_test.go b/pkg/webservice/handlers_test.go
index 9bf55648..7cf045df 100644
--- a/pkg/webservice/handlers_test.go
+++ b/pkg/webservice/handlers_test.go
@@ -291,15 +291,15 @@ var (
// setup To take care of setting up config, cluster, partitions etc
func setup(t *testing.T, config string, partitionCount int)
*scheduler.PartitionContext {
- var err error
- schedulerContext, err = scheduler.NewClusterContext(rmID, policyGroup,
[]byte(config))
+ ctx, err := scheduler.NewClusterContext(rmID, policyGroup,
[]byte(config))
assert.NilError(t, err, "Error when load clusterInfo from config")
+ schedulerContext.Store(ctx)
- assert.Equal(t, partitionCount,
len(schedulerContext.GetPartitionMapClone()))
+ assert.Equal(t, partitionCount,
len(schedulerContext.Load().GetPartitionMapClone()))
// Check default partition
partitionName := common.GetNormalizedPartitionName("default", rmID)
- part := schedulerContext.GetPartition(partitionName)
+ part := schedulerContext.Load().GetPartition(partitionName)
assert.Equal(t, 0, len(part.GetApplications()))
return part
}
@@ -506,9 +506,9 @@ func TestContainerHistory(t *testing.T) {
}
func TestGetConfigYAML(t *testing.T) {
- var err error
- schedulerContext, err = scheduler.NewClusterContext(rmID, policyGroup,
[]byte(startConf))
+ ctx, err := scheduler.NewClusterContext(rmID, policyGroup,
[]byte(startConf))
assert.NilError(t, err, "Error when load clusterInfo from config")
+ schedulerContext.Store(ctx)
// No err check: new request always returns correctly
//nolint: errcheck
req, _ := http.NewRequest("GET", "", nil)
@@ -524,7 +524,7 @@ func TestGetConfigYAML(t *testing.T) {
assert.Assert(t, len(startConfSum) > 0, "checksum boundary not found")
// change the config
- err = schedulerContext.UpdateRMSchedulerConfig(rmID,
[]byte(updatedConf))
+ err = schedulerContext.Load().UpdateRMSchedulerConfig(rmID,
[]byte(updatedConf))
assert.NilError(t, err, "Error when updating clusterInfo from config")
configs.SetConfigMap(updatedExtraConf)
@@ -558,7 +558,7 @@ func TestGetConfigJSON(t *testing.T) {
assert.Equal(t, conf.Partitions[0].NodeSortPolicy.Type, "fair", "node
sort policy set incorrectly, not fair (json)")
// change the config
- err = schedulerContext.UpdateRMSchedulerConfig(rmID,
[]byte(updatedConf))
+ err = schedulerContext.Load().UpdateRMSchedulerConfig(rmID,
[]byte(updatedConf))
assert.NilError(t, err, "Error when updating clusterInfo from config")
configs.SetConfigMap(updatedExtraConf)
@@ -582,10 +582,10 @@ func TestGetClusterUtilJSON(t *testing.T) {
buildInfoMap["buildDate"] = "2006-01-02T15:04:05-0700"
buildInfoMap["buildVersion"] = "latest"
buildInfoMap["isPluginVersion"] = "false"
- schedulerContext.SetRMInfo(rmID, buildInfoMap)
+ schedulerContext.Load().SetRMInfo(rmID, buildInfoMap)
rmBuildInformationMaps := getRMBuildInformation(nil)
assert.Equal(t, 0, len(rmBuildInformationMaps))
- rmInfo := schedulerContext.GetRMInfoMapClone()
+ rmInfo := schedulerContext.Load().GetRMInfoMapClone()
assert.Equal(t, 1, len(rmInfo))
rmBuildInformationMaps = getRMBuildInformation(rmInfo)
assert.Equal(t, 1, len(rmBuildInformationMaps))
@@ -596,7 +596,7 @@ func TestGetClusterUtilJSON(t *testing.T) {
// Check test partitions
partitionName := common.GetNormalizedPartitionName("default", rmID)
- partition := schedulerContext.GetPartition(partitionName)
+ partition := schedulerContext.Load().GetPartition(partitionName)
assert.Equal(t, partitionName, partition.Name)
// new app to partition
appID := "appID-1"
@@ -893,11 +893,13 @@ func TestGetNodeUtilisations(t *testing.T) {
assert.Equal(t, len(partitionNodesUtilDAOInfo), 0)
// setup partitions
- schedulerContext, err = scheduler.NewClusterContext(rmID, policyGroup,
[]byte(configMultiPartitions))
+ ctx, err := scheduler.NewClusterContext(rmID, policyGroup,
[]byte(configMultiPartitions))
+ assert.NilError(t, err, "Error when load clusterInfo from config")
+ schedulerContext.Store(ctx)
assert.NilError(t, err, "Error when load clusterInfo from config")
- schedulerContext.GetPartition("default")
- defaultPartition :=
schedulerContext.GetPartition(common.GetNormalizedPartitionName("default",
rmID))
- gpuPartition :=
schedulerContext.GetPartition(common.GetNormalizedPartitionName("gpu", rmID))
+ schedulerContext.Load().GetPartition("default")
+ defaultPartition :=
schedulerContext.Load().GetPartition(common.GetNormalizedPartitionName("default",
rmID))
+ gpuPartition :=
schedulerContext.Load().GetPartition(common.GetNormalizedPartitionName("gpu",
rmID))
// add nodes to partitions
node1 := addNode(t, defaultPartition, "node-1",
resources.NewResourceFromMap(map[string]resources.Quantity{"memory": 10}))
@@ -957,7 +959,7 @@ func getNodesUtilByType(t *testing.T, nodesUtilList
[]*dao.NodesUtilDAOInfo, res
}
func TestPartitions(t *testing.T) {
- schedulerContext = &scheduler.ClusterContext{}
+ schedulerContext.Store(&scheduler.ClusterContext{})
var req *http.Request
req, err := http.NewRequest("GET", "/ws/v1/partitions",
strings.NewReader(""))
@@ -1001,7 +1003,7 @@ func TestPartitions(t *testing.T) {
app6 := addAndConfirmApplicationExists(t, partitionName,
defaultPartition, "app-6")
app6.SetState(objects.Failed.String())
- NewWebApp(schedulerContext, nil)
+ NewWebApp(schedulerContext.Load(), nil)
// create test nodes
nodeRes :=
resources.NewResourceFromMap(map[string]resources.Quantity{siCommon.Memory:
500, siCommon.CPU: 500}).ToProto()
@@ -1079,7 +1081,7 @@ func TestMetricsNotEmpty(t *testing.T) {
func TestGetPartitionQueuesHandler(t *testing.T) {
setup(t, configTwoLevelQueues, 2)
- NewWebApp(schedulerContext, nil)
+ NewWebApp(schedulerContext.Load(), nil)
tMaxResource, err :=
resources.NewResourceFromConf(map[string]string{"memory": "600000"})
assert.NilError(t, err)
@@ -1164,7 +1166,7 @@ func TestGetPartitionQueueHandler(t *testing.T) {
queueA := "root.a"
setup(t, configTwoLevelQueues, 2)
- NewWebApp(schedulerContext, nil)
+ NewWebApp(schedulerContext.Load(), nil)
// test specific queue
var partitionQueueDao1 dao.PartitionQueueDAOInfo
@@ -1247,7 +1249,7 @@ func TestGetPartitionQueueHandler(t *testing.T) {
}
func TestGetClusterInfo(t *testing.T) {
- schedulerContext = &scheduler.ClusterContext{}
+ schedulerContext.Store(&scheduler.ClusterContext{})
resp := &MockResponseWriter{}
getClusterInfo(resp, nil)
var data []*dao.ClusterDAOInfo
@@ -1307,7 +1309,7 @@ func TestGetPartitionNodes(t *testing.T) {
err = partition.AddAllocation(allocs[0])
assert.NilError(t, err, "add alloc-2 should not have failed")
- NewWebApp(schedulerContext, nil)
+ NewWebApp(schedulerContext.Load(), nil)
var req *http.Request
req, err = createRequest(t, "/ws/v1/partition/default/nodes",
map[string]string{"partition": partitionNameWithoutClusterID})
@@ -1417,7 +1419,7 @@ func TestGetQueueApplicationsHandler(t *testing.T) {
assert.NilError(t, err, "ask should have been added to app")
app.SetTimedOutPlaceholder(tg, 1)
- NewWebApp(schedulerContext, nil)
+ NewWebApp(schedulerContext.Load(), nil)
var req *http.Request
req, err = createRequest(t, handlerURL+defaultQueue+handlerSuffix,
map[string]string{"partition": partitionNameWithoutClusterID, "queue":
defaultQueue})
@@ -1541,7 +1543,7 @@ func checkIllegalGetAppsRequest(t *testing.T, url string,
params httprouter.Para
func TestGetPartitionApplicationsByStateHandler(t *testing.T) {
defaultPartition := setup(t, configDefault, 1)
- NewWebApp(schedulerContext, nil)
+ NewWebApp(schedulerContext.Load(), nil)
// add a new application
app1 := addApp(t, "app-1", defaultPartition, "root.default", false)
@@ -1622,7 +1624,7 @@ func TestGetApplicationHandler(t *testing.T) {
err := app.AddAllocationAsk(ask)
assert.NilError(t, err, "ask should have been added to app")
- NewWebApp(schedulerContext, nil)
+ NewWebApp(schedulerContext.Load(), nil)
var req *http.Request
req, err = createRequest(t,
"/ws/v1/partition/default/queue/root.default/application/app-1",
map[string]string{"partition": partitionNameWithoutClusterID, "queue":
"root.default", "application": "app-1"})
@@ -1891,9 +1893,9 @@ func TestFullStateDumpPath(t *testing.T) {
}
configs.SetConfigMap(configMap)
- schedulerContext = prepareSchedulerContext(t)
+ prepareSchedulerContext(t)
- partitionContext := schedulerContext.GetPartitionMapClone()
+ partitionContext := schedulerContext.Load().GetPartitionMapClone()
context := partitionContext[normalizedPartitionName]
app := newApplication("appID", normalizedPartitionName, "root.default",
rmID, security.UserGroup{})
err := context.AddApplication(app)
@@ -2578,14 +2580,13 @@ func getEventRecordDao(t *testing.T, req *http.Request)
dao.EventRecordDAO {
return eventDao
}
-func prepareSchedulerContext(t *testing.T) *scheduler.ClusterContext {
+func prepareSchedulerContext(t *testing.T) {
config := []byte(configDefault)
var err error
- schedulerContext, err = scheduler.NewClusterContext(rmID, policyGroup,
config)
+ ctx, err := scheduler.NewClusterContext(rmID, policyGroup, config)
+ schedulerContext.Store(ctx)
assert.NilError(t, err, "Error when load clusterInfo from config")
- assert.Equal(t, 1, len(schedulerContext.GetPartitionMapClone()))
-
- return schedulerContext
+ assert.Equal(t, 1, len(schedulerContext.Load().GetPartitionMapClone()))
}
func prepareUserAndGroupContext(t *testing.T, config string) {
@@ -2612,7 +2613,7 @@ func prepareUserAndGroupContext(t *testing.T, config
string) {
app.AddAllocation(allocInfo)
assert.Assert(t, app.IsRunning(), "Application did not return running
state after alloc: %s", app.CurrentState())
- NewWebApp(schedulerContext, nil)
+ NewWebApp(schedulerContext.Load(), nil)
}
func prepareEmptyUserGroupContext() {
@@ -2707,7 +2708,7 @@ func runHealthCheckTest(t *testing.T, expected
*dao.SchedulerHealthDAOInfo) {
func TestGetPartitionRuleHandler(t *testing.T) {
setup(t, configDefault, 1)
- NewWebApp(schedulerContext, nil)
+ NewWebApp(schedulerContext.Load(), nil)
// test partition not exists
req, err := createRequest(t, "/ws/v1/partition/default/placementrules",
map[string]string{"partition": "notexists"})
@@ -2737,7 +2738,7 @@ func TestGetPartitionRuleHandler(t *testing.T) {
assert.Equal(t, partitionRules[1].Name, types.Recovery)
// change the config: 3 rules, expect recovery also
- err = schedulerContext.UpdateRMSchedulerConfig(rmID,
[]byte(placementRuleConfig))
+ err = schedulerContext.Load().UpdateRMSchedulerConfig(rmID,
[]byte(placementRuleConfig))
assert.NilError(t, err, "Error when updating clusterInfo from config")
req, err = createRequest(t, "/ws/v1/partition/default/placementrules",
map[string]string{"partition": partitionNameWithoutClusterID})
assert.NilError(t, err, httpRequestError)
diff --git a/pkg/webservice/state_dump.go b/pkg/webservice/state_dump.go
index 24f769e6..5cb7efc0 100644
--- a/pkg/webservice/state_dump.go
+++ b/pkg/webservice/state_dump.go
@@ -64,7 +64,7 @@ func doStateDump(w io.Writer) error {
stateDump.Lock()
defer stateDump.Unlock()
- partitionContext := schedulerContext.GetPartitionMapClone()
+ partitionContext := schedulerContext.Load().GetPartitionMapClone()
records := imHistory.GetRecords()
zapConfig := yunikornLog.GetZapConfigs()
diff --git a/pkg/webservice/webservice.go b/pkg/webservice/webservice.go
index 330d11e5..57587630 100644
--- a/pkg/webservice/webservice.go
+++ b/pkg/webservice/webservice.go
@@ -22,6 +22,7 @@ import (
"context"
"errors"
"net/http"
+ "sync/atomic"
"time"
"github.com/julienschmidt/httprouter"
@@ -34,7 +35,7 @@ import (
)
var imHistory *history.InternalMetricsHistory
-var schedulerContext *scheduler.ClusterContext
+var schedulerContext atomic.Pointer[scheduler.ClusterContext]
type WebService struct {
httpServer *http.Server
@@ -82,7 +83,7 @@ func (m *WebService) StartWebApp() {
func NewWebApp(context *scheduler.ClusterContext, internalMetrics
*history.InternalMetricsHistory) *WebService {
m := &WebService{}
- schedulerContext = context
+ schedulerContext.Store(context)
imHistory = internalMetrics
return m
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]