Github user danny0405 commented on a diff in the pull request:
https://github.com/apache/storm/pull/2433#discussion_r158713978
--- Diff:
storm-client/src/jvm/org/apache/storm/assignments/InMemoryAssignmentBackend.java
---
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.assignments;
+
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.storm.cluster.ClusterUtils;
+import org.apache.storm.generated.Assignment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An assignment backend which will keep all assignments and id-info in
memory. Only used if no backend is specified internal.
+ */
+public class InMemoryAssignmentBackend implements ILocalAssignmentsBackend
{
+ private static final Logger LOG =
LoggerFactory.getLogger(InMemoryAssignmentBackend.class);
+
+ protected Map<String, Assignment> idToAssignment;
+ protected Map<String, String> idToName;
+ protected Map<String, String> nameToId;
+ private volatile boolean isSynchronized = false;
+
+ public InMemoryAssignmentBackend() {}
+
+ @Override
+ public boolean isSynchronized() {
+ return this.isSynchronized;
+ }
+
+ @Override
+ public void setSynchronized() {
+ this.isSynchronized = true;
+ }
+
+ @Override
+ public void prepare(Map conf) {
+ // do nothing for conf now
+ this.idToAssignment = new ConcurrentHashMap<>();
+ this.idToName = new ConcurrentHashMap<>();
+ this.nameToId = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void keepOrUpdateAssignment(String stormId, Assignment
assignment) {
+ this.idToAssignment.put(stormId, assignment);
+ }
+
+ @Override
+ public Assignment getAssignment(String stormId) {
+ return this.idToAssignment.get(stormId);
+ }
+
+ @Override
+ public void removeAssignment(String stormId) {
+ this.idToAssignment.remove(stormId);
+ }
+
+ @Override
+ public List<String> assignments() {
+ if(idToAssignment == null) {
+ return new ArrayList<>();
+ }
+ List<String> ret = new ArrayList<>();
+ ret.addAll(this.idToAssignment.keySet());
+ return ret;
+ }
+
+ @Override
+ public Map<String, Assignment> assignmentsInfo() {
+ Map<String, Assignment> ret = new HashMap<>();
+ ret.putAll(this.idToAssignment);
+
+ return ret;
+ }
+
+ @Override
+ public void syncRemoteAssignments(Map<String, byte[]> remote) {
+ Map<String, Assignment> tmp = new ConcurrentHashMap<>();
--- End diff --
yeah, when supervisor sync the assignments, master may changed it at the
same time.
---