This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 3c3318805d16af3a40187ff67544f294c52b8019 Author: Hunter Lee <[email protected]> AuthorDate: Thu Mar 28 12:27:26 2019 -0700 TASK: Fix bug where JobDispatcher does not create UserContentStore for new jobs It was observed that there are multiple logic paths where a new job could get scheduled: 1. scheduleJobs() 2. processJobStatusUpdateAndAssignment(). When a job is being assigned by the latter, JobDispatcher would fail to create the UserContentStore for the job, causing all subsequent read/writes to this UserContentStore fail. This is a temporary fix and further refactoring of code paths would be required in order to consolidate where new jobs get scheduled. Changelist: 1. Add UserContentStore for jobs with null contexts --- .../java/org/apache/helix/task/JobDispatcher.java | 25 ++++++++++++++++++++-- .../main/java/org/apache/helix/task/TaskUtil.java | 2 +- .../org/apache/helix/task/WorkflowDispatcher.java | 24 +++++++++++++++++++-- 3 files changed, 46 insertions(+), 5 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java index 596b54b..6530f3a 100644 --- a/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/JobDispatcher.java @@ -1,6 +1,24 @@ package org.apache.helix.task; -import com.google.common.base.Joiner; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import com.google.common.collect.ImmutableMap; import java.util.Arrays; import java.util.Collection; @@ -11,7 +29,6 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; -import org.apache.helix.AccessOption; import org.apache.helix.ZNRecord; import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider; import org.apache.helix.controller.stages.CurrentStateOutput; @@ -98,6 +115,10 @@ public class JobDispatcher extends AbstractTaskDispatcher { jobCtx = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW)); jobCtx.setStartTime(System.currentTimeMillis()); jobCtx.setName(jobName); + // This job's JobContext has not been created yet. Since we are creating a new JobContext + // here, we must also create its UserContentStore + TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobName, + new ZNRecord(TaskUtil.USER_CONTENT_NODE)); workflowCtx.setJobState(jobName, TaskState.IN_PROGRESS); } diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java index 4565209..d15cf8f 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java @@ -315,7 +315,7 @@ public class TaskUtil { * @param workflowJobResource the name of workflow or job * @param record the initial data */ - protected static void createUserContent(HelixPropertyStore propertyStore, + protected static void createUserContent(HelixPropertyStore<ZNRecord> propertyStore, String workflowJobResource, ZNRecord record) { propertyStore.create(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowJobResource, TaskUtil.USER_CONTENT_NODE), record, AccessOption.PERSISTENT); diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java index eec2e87..80d9afb 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java @@ -1,5 +1,24 @@ package org.apache.helix.task; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + import com.google.common.collect.Lists; import java.text.DateFormat; import java.text.SimpleDateFormat; @@ -305,11 +324,12 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { } // Set up job resource based on partitions from target resource + + // Create the UserContentStore for the job first TaskUtil.createUserContent(_manager.getHelixPropertyStore(), jobResource, new ZNRecord(TaskUtil.USER_CONTENT_NODE)); - int numIndependentTasks = jobConfig.getTaskConfigMap().size(); - int numPartitions = numIndependentTasks; + int numPartitions = jobConfig.getTaskConfigMap().size(); if (numPartitions == 0) { IdealState targetIs = admin.getResourceIdealState(_manager.getClusterName(), jobConfig.getTargetResource());
