[
https://issues.apache.org/jira/browse/STORM-876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15015019#comment-15015019
]
ASF GitHub Bot commented on STORM-876:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/845#discussion_r45427932
--- Diff: storm-core/src/jvm/backtype/storm/blobstore/KeyVersion.java ---
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package backtype.storm.blobstore;
+
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TreeSet;
+import java.util.Map;
+import java.util.List;
+
+/**
+ * Class hands over the version of the key to be stored within the
zookeeper
+ */
+public class KeyVersion {
+ private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
+ private final String BLOBSTORE_SUBTREE="/blobstore";
+ private final String
BLOBSTORE_KEY_COUNTER_SUBTREE="/blobstorekeycounter";
+ private String key;
+ private NimbusInfo nimbusInfo;
+
+ public KeyVersion(String key, NimbusInfo nimbusInfo) {
+ this.key = key;
+ this.nimbusInfo = nimbusInfo;
+ }
+
+ public int getKeyVersion(Map conf) {
+ TreeSet<Integer> versions = new TreeSet<Integer>();
+ CuratorFramework zkClient = Utils.createZKClient(conf);
+ try {
+ // Key has not been created yet and it is the first time it is being
created
+ if(zkClient.checkExists().forPath(BLOBSTORE_SUBTREE + "/" + key) ==
null) {
+
zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
+
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(BLOBSTORE_KEY_COUNTER_SUBTREE +
"/" + key + "/" + 1);
+ return 1;
+ }
+
+ // When all nimbodes go down and one or few of them come up
+ // Unfortunately there might not be an exact way to know which one
contains the most updated blob
+ // if all go down which is unlikely. Hence there might be a need to
update the blob if all go down
+ List<String> stateInfoList =
zkClient.getChildren().forPath(BLOBSTORE_SUBTREE + "/" + key);
+ LOG.debug("stateInfoList {} stateInfoList {}", stateInfoList.size(),
stateInfoList);
+ if(stateInfoList.isEmpty()) {
+ return getKeyVersionCounterValue(zkClient, key);
+ }
+
+ LOG.debug("stateInfoSize {}", stateInfoList.size());
+ // In all other cases check for the latest version on the nimbus and
assign the version
+ // check if all are have same version, if not assign the highest
version
+ for (String stateInfo:stateInfoList) {
+
versions.add(Integer.parseInt(Utils.normalizeVersionInfo(stateInfo)[1]));
+ }
+
+ int currentCounter = getKeyVersionCounterValue(zkClient, key);
+ // This condition returns version when a nimbus crashes and comes up
+ if (!checkIfStateContainsCurrentNimbusHost(stateInfoList,
nimbusInfo) && !nimbusInfo.isLeader()) {
+ if (versions.last() < currentCounter) {
+ return currentCounter;
+ } else {
+ return currentCounter - 1;
+ }
+ }
+ // Condition checks for an update scenario
+ if (stateInfoList.size() >= 1 && versions.size() == 1) {
+ if (versions.first() < getKeyVersionCounterValue(zkClient, key)) {
+ incrementCounter(zkClient, key, currentCounter);
+ return currentCounter + 1;
+ } else {
+ incrementCounter(zkClient, key, currentCounter);
+ return versions.first() + 1;
+ }
+ }
+ } catch(Exception e) {
+ LOG.error("Exception {}", e);
+ } finally {
+ if (zkClient != null) {
+ zkClient.close();
+ }
+ }
+ return versions.last();
+ }
+
+ public boolean checkIfStateContainsCurrentNimbusHost(List<String>
stateInfoList, NimbusInfo nimbusInfo) {
+ boolean containsNimbusHost = false;
+ for(String stateInfo:stateInfoList) {
+ if(stateInfo.contains(nimbusInfo.getHost())) {
+ containsNimbusHost = true;
+ break;
+ }
+ }
+ return containsNimbusHost;
+ }
+
+ public void incrementCounter(CuratorFramework zkClient, String key, int
count) throws Exception {
--- End diff --
`private`?
> Dist Cache: Basic Functionality
> -------------------------------
>
> Key: STORM-876
> URL: https://issues.apache.org/jira/browse/STORM-876
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-core
> Reporter: Robert Joseph Evans
> Assignee: Robert Joseph Evans
> Attachments: DISTCACHE.md, DistributedCacheDesignDocument.pdf
>
>
> Basic functionality for the Dist Cache feature.
> As part of this a new API should be added to support uploading and
> downloading dist cache items. storm-core.ser, storm-conf.ser and storm.jar
> should be written into the blob store instead of residing locally. We need a
> default implementation of the blob store that does essentially what nimbus
> currently does and does not need anything extra. But having an HDFS backend
> too would be great for scalability and HA.
> The supervisor should provide a way to download and manage these blobs and
> provide a working directory for the worker process with symlinks to the
> blobs. It should also allow the blobs to be updated and switch the symlink
> atomically to point to the new blob once it is downloaded.
> All of this is already done by code internal to Yahoo! we are in the process
> of getting it ready to push back to open source shortly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)