Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexRangeBalancer.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexRangeBalancer.java?rev=1156404&view=auto ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexRangeBalancer.java (added) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexRangeBalancer.java Wed Aug 10 23:57:25 2011 @@ -0,0 +1,76 @@ +/* + * Licensed to Yahoo! under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Yahoo! licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph; + +import java.util.Map; +import java.util.NavigableMap; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.json.JSONArray; + +/** + * Interface to determine your own balancing of vertex ranges + * among the workers. + * + * @param <I> vertex id + * @param <V> vertex data + * @param <E> edge data + * @param <M> message data + */ +@SuppressWarnings("rawtypes") +public interface BasicVertexRangeBalancer<I extends WritableComparable, + V extends Writable, + E extends Writable, + M extends Writable> { + /** + * Get the upcoming superstep number (since this happens prior to the + * computation of the superstep + * + * @return the upcoming superstep + */ + long getSuperstep(); + + /** + * Get the last determined VertexRanges for the index type for the previous + * superstep. If this is the first superstep, then it was last determined + * by the loading. + * + * @return map containing last superstep's vertex ranges determination + */ + NavigableMap<I, VertexRange<I, V, E, M>> getPrevVertexRangeMap(); + + /** + * Get a list of available workers and associated hostname and port + * information. This list can be used to assign the + * vertices in rebalance(). + * + * @return Map of workers to this hostname and port information in a + * JSONArray + */ + Map<String, JSONArray> getWorkerHostnamePortMap(); + + /** + * User needs to implement this function and return the new vertex range + * assignments. + * + * @return Map containing current superstep's vertex ranges determination + */ + NavigableMap<I, VertexRange<I, V, E, M>> rebalance(); +}
Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java?rev=1156404&view=auto ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java (added) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BasicVertexResolver.java Wed Aug 10 23:57:25 2011 @@ -0,0 +1,59 @@ +/* + * Licensed to Yahoo! under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Yahoo! licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph; + +import java.util.List; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Handles all the situations that can arise upon creation/removal of + * vertices and edges. + */ +@SuppressWarnings("rawtypes") +public interface BasicVertexResolver< + I extends WritableComparable, + V extends Writable, + E extends Writable, + M extends Writable> { + /** + * A vertex may have been removed, created zero or more times and had + * zero or more messages sent to it. This method will handle all situations + * excluding the normal case (a vertex already exists and has zero or more + * messages sent it to). + * + * @param vertex Original vertex or null if none + * @param vertexChanges Changes that happened to this vertex or null if none + * @param msgList List of messages received in the last superstep or null + * if none + * @return Vertex to be returned, if null, and a vertex currently exists + * it will be removed + */ + BasicVertex<I, V, E, M> resolve(BasicVertex<I, V, E, M> vertex, + VertexChanges<I, V, E, M> vertexChanges, + List<M> msgList); + + /** + * Create a default vertex that can be used to return from resolve(). + * + * @return Newly instantiated vertex. + */ + MutableVertex<I, V, E, M> instantiateVertex(); +} Added: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java?rev=1156404&view=auto ============================================================================== --- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java (added) +++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspService.java Wed Aug 10 23:57:25 2011 @@ -0,0 +1,1101 @@ +/* + * Licensed to Yahoo! under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Yahoo! licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.graph; + +import java.io.IOException; +import java.security.InvalidParameterException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.net.InetAddress; +import java.net.UnknownHostException; + +import org.apache.log4j.Logger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.mapreduce.Mapper; + +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs.Ids; +import org.json.JSONArray; +import org.json.JSONException; +import org.json.JSONObject; + +import org.apache.giraph.bsp.CentralizedService; +import org.apache.giraph.zk.BspEvent; +import org.apache.giraph.zk.PredicateLock; +import org.apache.giraph.zk.ZooKeeperExt; + +/** + * Zookeeper-based implementation of {@link CentralizedService}. + */ +@SuppressWarnings("rawtypes") +public abstract class BspService < + I extends WritableComparable, + V extends Writable, + E extends Writable, + M extends Writable> + implements Watcher, CentralizedService<I, V, E, M> { + /** Private ZooKeeper instance that implements the service */ + private final ZooKeeperExt zk; + /** Has the Connection occurred? */ + private final BspEvent connectedEvent = new PredicateLock(); + /** Has worker registration changed (either healthy or unhealthy) */ + private final BspEvent workerHealthRegistrationChanged = + new PredicateLock(); + /** InputSplits are ready for consumption by workers */ + private final BspEvent inputSplitsAllReadyChanged = + new PredicateLock(); + /** InputSplit reservation or finished notification and synchronization */ + private final BspEvent inputSplitsStateChanged = + new PredicateLock(); + /** Are the worker assignments of vertex ranges ready? */ + private final BspEvent vertexRangeAssignmentsReadyChanged = + new PredicateLock(); + /** Have the vertex range exchange children changed? */ + private final BspEvent vertexRangeExchangeChildrenChanged = + new PredicateLock(); + /** Are the vertex range exchanges done? */ + private final BspEvent vertexRangeExchangeFinishedChanged = + new PredicateLock(); + /** Application attempt changed */ + private final BspEvent applicationAttemptChanged = + new PredicateLock(); + /** Superstep finished synchronization */ + private final BspEvent superstepFinished = + new PredicateLock(); + /** Master election changed for any waited on attempt */ + private final BspEvent masterElectionChildrenChanged = + new PredicateLock(); + /** Cleaned up directory children changed*/ + private final BspEvent cleanedUpChildrenChanged = + new PredicateLock(); + /** Registered list of BspEvents */ + private final List<BspEvent> registeredBspEvents = + new ArrayList<BspEvent>(); + /** Configuration of the job*/ + private final Configuration conf; + /** Job context (mainly for progress) */ + private final Mapper<?, ?, ?, ?>.Context context; + /** Cached superstep (from ZooKeeper) */ + private long cachedSuperstep = UNSET_SUPERSTEP; + /** Restarted from a checkpoint (manual or automatic) */ + private long restartedSuperstep = UNSET_SUPERSTEP; + /** Cached application attempt (from ZooKeeper) */ + private long cachedApplicationAttempt = UNSET_APPLICATION_ATTEMPT; + /** Job id, to ensure uniqueness */ + private final String jobId; + /** Task partition, to ensure uniqueness */ + private final int taskPartition; + /** My hostname */ + private final String hostname; + /** Combination of hostname '_' partition (unique id) */ + private final String hostnamePartitionId; + /** Mapper that will do the graph computation */ + private final GraphMapper<I, V, E, M> graphMapper; + /** Class logger */ + private static final Logger LOG = Logger.getLogger(BspService.class); + /** File system */ + private final FileSystem fs; + /** Used to call pre/post application/superstep methods */ + private final Vertex<I, V, E, M> representativeVertex; + /** Checkpoint frequency */ + private int checkpointFrequency = -1; + /** Vertex range map based on the superstep below */ + private NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap = + new TreeMap<I, VertexRange<I, V, E, M>>(); + /** Vertex range set is based on this superstep */ + private long vertexRangeSuperstep = UNSET_SUPERSTEP; + /** Map of aggregators */ + private Map<String, Aggregator<Writable>> aggregatorMap = + new TreeMap<String, Aggregator<Writable>>(); + + /** Unset superstep */ + public static final long UNSET_SUPERSTEP = Long.MIN_VALUE; + /** Input superstep (superstep when loading the vertices happens) */ + public static final long INPUT_SUPERSTEP = -1; + /** Unset application attempt */ + public static final long UNSET_APPLICATION_ATTEMPT = Long.MIN_VALUE; + + public static final String BASE_DIR = "/_hadoopBsp"; + public static final String MASTER_JOB_STATE_NODE = "/_masterJobState"; + public static final String INPUT_SPLIT_DIR = "/_inputSplitsDir"; + public static final String INPUT_SPLIT_RESERVED_NODE = + "/_inputSplitReserved"; + public static final String INPUT_SPLIT_FINISHED_NODE = + "/_inputSplitFinished"; + public static final String INPUT_SPLITS_ALL_READY_NODE = + "/_inputSplitsAllReady"; + public static final String APPLICATION_ATTEMPTS_DIR = + "/_applicationAttemptsDir"; + public static final String MASTER_ELECTION_DIR = "/_masterElectionDir"; + public static final String SUPERSTEP_DIR = "/_superstepDir"; + public static final String MERGED_AGGREGATOR_DIR = + "/_mergedAggregatorDir"; + public static final String WORKER_HEALTHY_DIR = "/_workerHealthyDir"; + public static final String WORKER_UNHEALTHY_DIR = "/_workerUnhealthyDir"; + public static final String WORKER_FINISHED_DIR = "/_workerFinishedDir"; + public static final String VERTEX_RANGE_ASSIGNMENTS_DIR = + "/_vertexRangeAssignments"; + public static final String VERTEX_RANGE_EXCHANGE_DIR = + "/_vertexRangeExchangeDir"; + public static final String VERTEX_RANGE_EXCHANGED_FINISHED_NODE = + "/_vertexRangeExchangeFinished"; + public static final String SUPERSTEP_FINISHED_NODE = "/_superstepFinished"; + public static final String CLEANED_UP_DIR = "/_cleanedUpDir"; + + public static final String JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY = + "_aggregatorValueArrayKey"; + public static final String JSONOBJ_VERTEX_RANGE_STAT_ARRAY_KEY = + "_vertexRangeStatArrayKey"; + public static final String JSONOBJ_FINISHED_VERTICES_KEY = + "_verticesFinishedKey"; + public static final String JSONOBJ_NUM_VERTICES_KEY = "_numVerticesKey"; + public static final String JSONOBJ_NUM_EDGES_KEY = "_numEdgesKey"; + public static final String JSONOBJ_NUM_MESSAGES_KEY = "_numMsgsKey"; + public static final String JSONOBJ_HOSTNAME_ID_KEY = "_hostnameIdKey"; + public static final String JSONOBJ_MAX_VERTEX_INDEX_KEY = + "_maxVertexIndexKey"; + public static final String JSONOBJ_HOSTNAME_KEY = "_hostnameKey"; + public static final String JSONOBJ_PORT_KEY = "_portKey"; + public static final String JSONOBJ_CHECKPOINT_FILE_PREFIX_KEY = + "_checkpointFilePrefixKey"; + public static final String JSONOBJ_PREVIOUS_HOSTNAME_KEY = + "_previousHostnameKey"; + public static final String JSONOBJ_PREVIOUS_PORT_KEY = "_previousPortKey"; + public static final String JSONOBJ_STATE_KEY = "_stateKey"; + public static final String JSONOBJ_APPLICATION_ATTEMPT_KEY = + "_applicationAttemptKey"; + public static final String JSONOBJ_SUPERSTEP_KEY = + "_superstepKey"; + public static final String AGGREGATOR_NAME_KEY = "_aggregatorNameKey"; + public static final String AGGREGATOR_CLASS_NAME_KEY = + "_aggregatorClassNameKey"; + public static final String AGGREGATOR_VALUE_KEY = "_aggregatorValueKey"; + + public static final String WORKER_SUFFIX = "_worker"; + public static final String MASTER_SUFFIX = "_master"; + + /** Path to the job's root */ + public final String BASE_PATH; + /** Path to the job state determined by the master (informative only) */ + public final String MASTER_JOB_STATE_PATH; + /** Path to the input splits written by the master */ + public final String INPUT_SPLIT_PATH; + /** Path to the input splits all ready to be processed by workers */ + public final String INPUT_SPLITS_ALL_READY_PATH; + /** Path to the application attempts) */ + public final String APPLICATION_ATTEMPTS_PATH; + /** Path to the cleaned up notifications */ + public final String CLEANED_UP_PATH; + /** Path to the checkpoint's root (including job id) */ + public final String CHECKPOINT_BASE_PATH; + /** Path to the master election path */ + public final String MASTER_ELECTION_PATH; + + /** + * Get the superstep from a ZooKeeper path + * + * @param path Path to parse for the superstep + */ + public static long getSuperstepFromPath(String path) { + int foundSuperstepStart = path.indexOf(SUPERSTEP_DIR); + if (foundSuperstepStart == -1) { + throw new IllegalArgumentException( + "getSuperstepFromPath: Cannot find " + SUPERSTEP_DIR + + "from " + path); + } + foundSuperstepStart += SUPERSTEP_DIR.length() + 1; + int endIndex = foundSuperstepStart + + path.substring(foundSuperstepStart).indexOf("/"); + if (endIndex == -1) { + throw new IllegalArgumentException( + "getSuperstepFromPath: Cannot find end of superstep from " + + path); + } + if (LOG.isDebugEnabled()) { + LOG.debug("getSuperstepFromPath: Got path=" + path + + ", start=" + foundSuperstepStart + ", end=" + endIndex); + } + return Long.parseLong(path.substring(foundSuperstepStart, endIndex)); + } + + /** + * Get the hostname and id from a "healthy" worker path + */ + public static String getHealthyHostnameIdFromPath(String path) { + int foundWorkerHealthyStart = path.indexOf(WORKER_HEALTHY_DIR); + if (foundWorkerHealthyStart == -1) { + throw new IllegalArgumentException( + "getHealthyHostnameidFromPath: Couldn't find " + + WORKER_HEALTHY_DIR + " from " + path); + } + foundWorkerHealthyStart += WORKER_HEALTHY_DIR.length(); + return path.substring(foundWorkerHealthyStart); + } + + /** + * Generate the base superstep directory path for a given application + * attempt + * + * @param attempt application attempt number + * @return directory path based on the an attempt + */ + final public String getSuperstepPath(long attempt) { + return APPLICATION_ATTEMPTS_PATH + "/" + attempt + SUPERSTEP_DIR; + } + + /** + * Generate the worker "healthy" directory path for a superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + final public String getWorkerHealthyPath(long attempt, long superstep) { + return APPLICATION_ATTEMPTS_PATH + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + WORKER_HEALTHY_DIR; + } + + /** + * Generate the worker "unhealthy" directory path for a superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + final public String getWorkerUnhealthyPath(long attempt, long superstep) { + return APPLICATION_ATTEMPTS_PATH + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + WORKER_UNHEALTHY_DIR; + } + + /** + * Generate the worker "finished" directory path for a superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + final public String getWorkerFinishedPath(long attempt, long superstep) { + return APPLICATION_ATTEMPTS_PATH + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + WORKER_FINISHED_DIR; + } + + /** + * Generate the "vertex range assignments" directory path for a superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + final public String getVertexRangeAssignmentsPath(long attempt, + long superstep) { + return APPLICATION_ATTEMPTS_PATH + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + VERTEX_RANGE_ASSIGNMENTS_DIR; + } + + /** + * Generate the "vertex range exchange" directory path for a superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + final public String getVertexRangeExchangePath(long attempt, + long superstep) { + return APPLICATION_ATTEMPTS_PATH + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + VERTEX_RANGE_EXCHANGE_DIR; + } + + /** + * Generate the "vertex range exchange finished" directory path for + * a superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + final public String getVertexRangeExchangeFinishedPath(long attempt, + long superstep) { + return APPLICATION_ATTEMPTS_PATH + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + + VERTEX_RANGE_EXCHANGED_FINISHED_NODE; + } + + /** + * Generate the merged aggregator directory path for a superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + final public String getMergedAggregatorPath(long attempt, long superstep) { + return APPLICATION_ATTEMPTS_PATH + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + MERGED_AGGREGATOR_DIR; + } + + /** + * Generate the "superstep finished" directory path for a superstep + * + * @param attempt application attempt number + * @param superstep superstep to use + * @return directory path based on the a superstep + */ + final public String getSuperstepFinishedPath(long attempt, long superstep) { + return APPLICATION_ATTEMPTS_PATH + "/" + attempt + + SUPERSTEP_DIR + "/" + superstep + SUPERSTEP_FINISHED_NODE; + } + + /** + * Generate the base superstep directory path for a given application + * attempt + * + * @param superstep Superstep to use + * @return Directory path based on the a superstep + */ + final public String getCheckpointBasePath(long superstep) { + return CHECKPOINT_BASE_PATH + "/" + superstep; + } + + /** If at the end of a checkpoint file, indicates metadata */ + public final String CHECKPOINT_METADATA_POSTFIX = ".metadata"; + + /** + * If at the end of a checkpoint file, indicates vertices, edges, + * messages, etc. + */ + public final String CHECKPOINT_VERTICES_POSTFIX = ".vertices"; + + /** + * If at the end of a checkpoint file, indicates metadata and data is valid + * for the same filenames without .valid + */ + public final String CHECKPOINT_VALID_POSTFIX = ".valid"; + + /** + * If at the end of a checkpoint file, indicates the stitched checkpoint + * file prefixes. A checkpoint is not valid if this file does not exist. + */ + public static final String CHECKPOINT_FINALIZED_POSTFIX = ".finalized"; + + /** + * Get the checkpoint from a finalized checkpoint path + * + * @param finalizedPath Path of the finalized checkpoint + * @return Superstep referring to a checkpoint of the finalized path + */ + public static long getCheckpoint(Path finalizedPath) { + if (!finalizedPath.getName().endsWith(CHECKPOINT_FINALIZED_POSTFIX)) { + throw new InvalidParameterException( + "getCheckpoint: " + finalizedPath + "Doesn't end in " + + CHECKPOINT_FINALIZED_POSTFIX); + } + String checkpointString = + finalizedPath.getName().replace(CHECKPOINT_FINALIZED_POSTFIX, ""); + return Long.parseLong(checkpointString); + } + + /** + * Get the ZooKeeperExt instance. + * + * @return ZooKeeperExt instance. + */ + final public ZooKeeperExt getZkExt() { + return zk; + } + + @Override + final public long getRestartedSuperstep() { + return restartedSuperstep; + } + + /** + * Set the restarted superstep + * + * @param superstep Set the manually restarted superstep + */ + final public void setRestartedSuperstep(long superstep) { + if (superstep < INPUT_SUPERSTEP) { + throw new IllegalArgumentException( + "setRestartedSuperstep: Bad argument " + superstep); + } + restartedSuperstep = superstep; + } + + /** + * Should checkpoint on this superstep? If checkpointing, always + * checkpoint the first user superstep. If restarting, the first + * checkpoint is after the frequency has been met. + * + * @param superstep Decide if checkpointing no this superstep + * @return True if this superstep should be checkpointed, false otherwise + */ + final public boolean checkpointFrequencyMet(long superstep) { + if (checkpointFrequency == 0) { + return false; + } + long firstCheckpoint = INPUT_SUPERSTEP + 1; + if (getRestartedSuperstep() != UNSET_SUPERSTEP) { + firstCheckpoint = getRestartedSuperstep() + checkpointFrequency; + } + if (superstep < firstCheckpoint) { + return false; + } else if (((superstep - firstCheckpoint) % checkpointFrequency) == 0) { + return true; + } else { + return false; + } + } + + /** + * Get the file system + * + * @return file system + */ + final public FileSystem getFs() { + return fs; + } + + final public Configuration getConfiguration() { + return conf; + } + + final public Mapper<?, ?, ?, ?>.Context getContext() { + return context; + } + + final public String getHostname() { + return hostname; + } + + final public String getHostnamePartitionId() { + return hostnamePartitionId; + } + + final public int getTaskPartition() { + return taskPartition; + } + + final public GraphMapper<I, V, E, M> getGraphMapper() { + return graphMapper; + } + + final public BspEvent getWorkerHealthRegistrationChangedEvent() { + return workerHealthRegistrationChanged; + } + + final public BspEvent getInputSplitsAllReadyEvent() { + return inputSplitsAllReadyChanged; + } + + final public BspEvent getInputSplitsStateChangedEvent() { + return inputSplitsStateChanged; + } + + final public BspEvent getVertexRangeAssignmentsReadyChangedEvent() { + return vertexRangeAssignmentsReadyChanged; + } + + final public BspEvent getVertexRangeExchangeChildrenChangedEvent() { + return vertexRangeExchangeChildrenChanged; + } + + final public BspEvent getVertexRangeExchangeFinishedChangedEvent() { + return vertexRangeExchangeFinishedChanged; + } + + final public BspEvent getApplicationAttemptChangedEvent() { + return applicationAttemptChanged; + } + + final public BspEvent getSuperstepFinishedEvent() { + return superstepFinished; + } + + + final public BspEvent getMasterElectionChildrenChangedEvent() { + return masterElectionChildrenChanged; + } + + final public BspEvent getCleanedUpChildrenChangedEvent() { + return cleanedUpChildrenChanged; + } + + /** + * Get the master commanded job state as a JSONObject. Also sets the + * watches to see if the master commanded job state changes. + * + * @return Last job state or null if none + */ + final public JSONObject getJobState() { + try { + getZkExt().createExt(MASTER_JOB_STATE_PATH, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException.NodeExistsException e) { + LOG.info("getJobState: Job state already exists (" + + MASTER_JOB_STATE_PATH + ")"); + } catch (Exception e) { + throw new RuntimeException(e); + } + String jobState = null; + try { + List<String> childList = + getZkExt().getChildrenExt( + MASTER_JOB_STATE_PATH, true, true, true); + if (childList.isEmpty()) { + return null; + } + jobState = + new String(getZkExt().getData( + childList.get(childList.size() - 1), true, null)); + } catch (KeeperException.NoNodeException e) { + LOG.info("getJobState: Job state path is empty! - " + + MASTER_JOB_STATE_PATH); + } catch (Exception e) { + throw new RuntimeException(e); + } + try { + return new JSONObject(jobState); + } catch (JSONException e) { + throw new RuntimeException( + "getJobState: Failed to parse job state " + jobState); + } + } + + public BspService(String serverPortList, + int sessionMsecTimeout, + Mapper<?, ?, ?, ?>.Context context, + GraphMapper<I, V, E, M> graphMapper) { + registerBspEvent(connectedEvent); + registerBspEvent(workerHealthRegistrationChanged); + registerBspEvent(inputSplitsAllReadyChanged); + registerBspEvent(inputSplitsStateChanged); + registerBspEvent(vertexRangeAssignmentsReadyChanged); + registerBspEvent(vertexRangeExchangeChildrenChanged); + registerBspEvent(vertexRangeExchangeFinishedChanged); + registerBspEvent(applicationAttemptChanged); + registerBspEvent(superstepFinished); + registerBspEvent(masterElectionChildrenChanged); + registerBspEvent(cleanedUpChildrenChanged); + + this.context = context; + this.graphMapper = graphMapper; + this.conf = context.getConfiguration(); + this.jobId = conf.get("mapred.job.id", "Unknown Job"); + this.taskPartition = conf.getInt("mapred.task.partition", -1); + this.restartedSuperstep = conf.getLong(GiraphJob.RESTART_SUPERSTEP, + UNSET_SUPERSTEP); + this.cachedSuperstep = restartedSuperstep; + if ((restartedSuperstep != UNSET_SUPERSTEP) && + (restartedSuperstep < 0)) { + throw new IllegalArgumentException( + "BspService: Invalid superstep to restart - " + + restartedSuperstep); + } + try { + this.hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + throw new RuntimeException(e); + } + this.hostnamePartitionId = hostname + "_" + getTaskPartition(); + + this.representativeVertex = + BspUtils.<I, V, E, M>createVertex(getConfiguration()); + + this.checkpointFrequency = + conf.getInt(GiraphJob.CHECKPOINT_FREQUENCY, + GiraphJob.CHECKPOINT_FREQUENCY_DEFAULT); + + BASE_PATH = BASE_DIR + "/" + jobId; + MASTER_JOB_STATE_PATH = BASE_PATH + MASTER_JOB_STATE_NODE; + INPUT_SPLIT_PATH = BASE_PATH + INPUT_SPLIT_DIR; + INPUT_SPLITS_ALL_READY_PATH = BASE_PATH + INPUT_SPLITS_ALL_READY_NODE; + APPLICATION_ATTEMPTS_PATH = BASE_PATH + APPLICATION_ATTEMPTS_DIR; + CLEANED_UP_PATH = BASE_PATH + CLEANED_UP_DIR; + CHECKPOINT_BASE_PATH = + getConfiguration().get( + GiraphJob.CHECKPOINT_DIRECTORY, + GiraphJob.CHECKPOINT_DIRECTORY_DEFAULT + "/" + getJobId()); + MASTER_ELECTION_PATH = BASE_PATH + MASTER_ELECTION_DIR; + if (LOG.isInfoEnabled()) { + LOG.info("BspService: Connecting to ZooKeeper with job " + jobId + + ", " + getTaskPartition() + " on " + serverPortList); + } + try { + this.zk = new ZooKeeperExt(serverPortList, sessionMsecTimeout, this); + connectedEvent.waitForever(); + this.fs = FileSystem.get(getConfiguration()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Get the job id + * + * @return job id + */ + final public String getJobId() { + return jobId; + } + + final public BasicVertex<I, V, E, M> getRepresentativeVertex() { + return representativeVertex; + } + + /** + * Get the latest application attempt and cache it. + * + * @return the latest application attempt + */ + final public long getApplicationAttempt() { + if (cachedApplicationAttempt != UNSET_APPLICATION_ATTEMPT) { + return cachedApplicationAttempt; + } + try { + getZkExt().createExt(APPLICATION_ATTEMPTS_PATH, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException.NodeExistsException e) { + LOG.info("getApplicationAttempt: Node " + + APPLICATION_ATTEMPTS_PATH + " already exists!"); + } catch (Exception e) { + throw new RuntimeException(e); + } + try { + List<String> attemptList = + getZkExt().getChildrenExt( + APPLICATION_ATTEMPTS_PATH, true, false, false); + if (attemptList.isEmpty()) { + cachedApplicationAttempt = 0; + } + else { + cachedApplicationAttempt = + Long.parseLong(Collections.max(attemptList)); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + return cachedApplicationAttempt; + } + + /** + * Get the latest superstep and cache it. + * + * @return the latest superstep + * @throws InterruptedException + * @throws KeeperException + */ + final public long getSuperstep() { + if (cachedSuperstep != UNSET_SUPERSTEP) { + return cachedSuperstep; + } + String superstepPath = getSuperstepPath(getApplicationAttempt()); + try { + getZkExt().createExt(superstepPath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException.NodeExistsException e) { + if (LOG.isInfoEnabled()) { + LOG.info("getApplicationAttempt: Node " + + APPLICATION_ATTEMPTS_PATH + " already exists!"); + } + } catch (KeeperException e) { + throw new IllegalStateException( + "getSuperstep: KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "getSuperstep: InterruptedException", e); + } + + List<String> superstepList; + try { + superstepList = + getZkExt().getChildrenExt(superstepPath, true, false, false); + } catch (KeeperException e) { + throw new IllegalStateException( + "getSuperstep: KeeperException", e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "getSuperstep: InterruptedException", e); + } + if (superstepList.isEmpty()) { + cachedSuperstep = INPUT_SUPERSTEP; + } + else { + cachedSuperstep = + Long.parseLong(Collections.max(superstepList)); + } + + return cachedSuperstep; + } + + /** + * Increment the cached superstep. Shouldn't be the initial value anymore. + */ + final public void incrCachedSuperstep() { + if (cachedSuperstep == UNSET_SUPERSTEP) { + throw new IllegalStateException( + "incrSuperstep: Invalid unset cached superstep " + + UNSET_SUPERSTEP); + } + ++cachedSuperstep; + } + + /** + * Set the cached superstep (should only be used for loading checkpoints + * or recovering from failure). + * + * @param superstep will be used as the next superstep iteration + */ + final public void setCachedSuperstep(long superstep) { + cachedSuperstep = superstep; + } + + /** + * Set the cached application attempt (should only be used for restart from + * failure by the master) + * + * @param applicationAttempt Will denote the new application attempt + */ + final public void setApplicationAttempt(long applicationAttempt) { + cachedApplicationAttempt = applicationAttempt; + String superstepPath = getSuperstepPath(cachedApplicationAttempt); + try { + getZkExt().createExt(superstepPath, + null, + Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + } catch (KeeperException.NodeExistsException e) { + throw new IllegalArgumentException( + "setApplicationAttempt: Attempt already exists! - " + + superstepPath, e); + } catch (KeeperException e) { + throw new RuntimeException( + "setApplicationAttempt: KeeperException - " + + superstepPath, e); + } catch (InterruptedException e) { + throw new RuntimeException( + "setApplicationAttempt: InterruptedException - " + + superstepPath, e); + } + } + + /** + * Gets the storable vertex range map, bypasses the cache. Used by workers + * to dump the vertices into. + * + * @return Actual map of max vertex range indices to vertex ranges + */ + public NavigableMap<I, VertexRange<I, V, E, M>> + getStorableVertexRangeMap() { + return vertexRangeMap; + } + + /** + * Based on a superstep, get the mapping of vertex range maxes to vertex + * ranges. This can be used to look up a particular vertex. + * + * @param superstep Superstep to get the vertex ranges for + * @return Cached map of max vertex range indices to vertex ranges + */ + public NavigableMap<I, VertexRange<I, V, E, M>> getVertexRangeMap( + long superstep) { + if (LOG.isDebugEnabled()) { + LOG.debug("getVertexRangeMap: Current superstep = " + + getSuperstep() + ", desired superstep = " + superstep); + } + + if (vertexRangeSuperstep == superstep) { + return vertexRangeMap; + } + vertexRangeSuperstep = superstep; + NavigableMap<I, VertexRange<I, V, E, M>> nextVertexRangeMap = + new TreeMap<I, VertexRange<I, V, E, M>>(); + String vertexRangeAssignmentsPath = + getVertexRangeAssignmentsPath(getApplicationAttempt(), + superstep); + try { + JSONArray vertexRangeAssignmentsArray = + new JSONArray( + new String(getZkExt().getData(vertexRangeAssignmentsPath, + false, + null))); + if (LOG.isDebugEnabled()) { + LOG.debug("getVertexRangeSet: Found vertex ranges " + + vertexRangeAssignmentsArray.toString() + + " on superstep " + superstep); + } + for (int i = 0; i < vertexRangeAssignmentsArray.length(); ++i) { + JSONObject vertexRangeObj = + vertexRangeAssignmentsArray.getJSONObject(i); + Class<I> indexClass = + BspUtils.getVertexIndexClass(getConfiguration()); + VertexRange<I, V, E, M> vertexRange = + new VertexRange<I, V, E, M>(indexClass, + vertexRangeObj); + if (nextVertexRangeMap.containsKey(vertexRange.getMaxIndex())) { + throw new IllegalStateException( + "getVertexRangeMap: Impossible that vertex range " + + "max " + vertexRange.getMaxIndex() + + " already exists! Duplicate vertex ranges include " + + nextVertexRangeMap.get(vertexRange.getMaxIndex()) + + " and " + vertexRange); + } + nextVertexRangeMap.put(vertexRange.getMaxIndex(), vertexRange); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + // Copy over the vertices to the vertex ranges + for (Entry<I, VertexRange<I, V, E, M>> entry : + nextVertexRangeMap.entrySet()) { + if (!vertexRangeMap.containsKey(entry.getKey())) { + continue; + } + VertexRange<I, V, E, M> vertexRange = + vertexRangeMap.get(entry.getKey()); + entry.getValue().getVertexMap().putAll( + vertexRange.getVertexMap()); + } + vertexRangeMap = nextVertexRangeMap; + return vertexRangeMap; + } + + public NavigableMap<I, VertexRange<I, V, E, M>> getCurrentVertexRangeMap() + { + return vertexRangeMap; + } + + /** + * Register an aggregator with name. + * + * @param name Name of the aggregator + * @param aggregatorClass Class of the aggregator + * @return Aggregator + * @throws IllegalAccessException + * @throws InstantiationException + */ + public final <A extends Writable> Aggregator<A> registerAggregator( + String name, + Class<? extends Aggregator<A>> aggregatorClass) + throws InstantiationException, IllegalAccessException { + if (aggregatorMap.get(name) != null) { + return null; + } + Aggregator<A> aggregator = + (Aggregator<A>) aggregatorClass.newInstance(); + @SuppressWarnings("unchecked") + Aggregator<Writable> writableAggregator = + (Aggregator<Writable>) aggregator; + aggregatorMap.put(name, writableAggregator); + if (LOG.isInfoEnabled()) { + LOG.info("registerAggregator: registered " + name); + } + return aggregator; + } + + /** + * Get aggregator by name. + * + * @param name + * @return Aggregator<A> (null when not registered) + */ + public final Aggregator<? extends Writable> getAggregator(String name) { + return aggregatorMap.get(name); + } + + /** + * Get the aggregator map. + */ + public Map<String, Aggregator<Writable>> getAggregatorMap() { + return aggregatorMap; + } + + /** + * Register a BspEvent. Ensure that it will be signaled + * by catastrophic failure so that threads waiting on an event signal + * will be unblocked. + */ + public void registerBspEvent(BspEvent event) { + registeredBspEvents.add(event); + } + + /** + * Derived classes that want additional ZooKeeper events to take action + * should override this. + * + * @param event Event that occurred + * @return true if the event was processed here, false otherwise + */ + protected boolean processEvent(WatchedEvent event) { + return false; + } + + @Override + final public void process(WatchedEvent event) { + // 1. Process all shared events + // 2. Process specific derived class events + + if (LOG.isDebugEnabled()) { + LOG.debug("process: Got a new event, path = " + event.getPath() + + ", type = " + event.getType() + ", state = " + + event.getState()); + } + + if ((event.getPath() == null) && (event.getType() == EventType.None)) { + if (event.getState() == KeeperState.Disconnected) { + // No way to recover from a disconnect event, signal all BspEvents + for (BspEvent bspEvent : registeredBspEvents) { + bspEvent.signal(); + } + throw new RuntimeException( + "process: Disconnected from ZooKeeper, cannot recover."); + } else if (event.getState() == KeeperState.SyncConnected) { + if (LOG.isInfoEnabled()) { + LOG.info("process: Asynchronous connection complete."); + } + connectedEvent.signal(); + } + return; + } + + boolean eventProcessed = false; + if (event.getPath().startsWith(MASTER_JOB_STATE_PATH)) { + // This will cause all becomeMaster() MasterThreads to notice the + // change in job state and quit trying to become the master. + masterElectionChildrenChanged.signal(); + eventProcessed = true; + } else if ((event.getPath().contains(WORKER_HEALTHY_DIR) || + event.getPath().contains(WORKER_UNHEALTHY_DIR)) && + (event.getType() == EventType.NodeChildrenChanged)) { + if (LOG.isDebugEnabled()) { + LOG.debug("process: workerHealthRegistrationChanged " + + "(worker health reported - healthy/unhealthy )"); + } + workerHealthRegistrationChanged.signal(); + eventProcessed = true; + } else if (event.getPath().equals(INPUT_SPLITS_ALL_READY_PATH) && + (event.getType() == EventType.NodeCreated)) { + if (LOG.isInfoEnabled()) { + LOG.info("process: inputSplitsReadyChanged " + + "(input splits ready)"); + } + inputSplitsAllReadyChanged.signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(INPUT_SPLIT_RESERVED_NODE) && + (event.getType() == EventType.NodeDeleted)) { + if (LOG.isInfoEnabled()) { + LOG.info("process: inputSplitsStateChanged "+ + "(lost a reservation)"); + } + inputSplitsStateChanged.signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(INPUT_SPLIT_FINISHED_NODE) && + (event.getType() == EventType.NodeCreated)) { + if (LOG.isDebugEnabled()) { + LOG.debug("process: inputSplitsStateChanged " + + "(finished inputsplit)"); + } + inputSplitsStateChanged.signal(); + eventProcessed = true; + } else if (event.getPath().contains(VERTEX_RANGE_ASSIGNMENTS_DIR) && + event.getType() == EventType.NodeCreated) { + if (LOG.isInfoEnabled()) { + LOG.info("process: vertexRangeAssignmentsReadyChanged " + + "(vertex ranges are assigned)"); + } + vertexRangeAssignmentsReadyChanged.signal(); + eventProcessed = true; + } else if (event.getPath().contains(VERTEX_RANGE_EXCHANGE_DIR) && + event.getType() == EventType.NodeChildrenChanged) { + if (LOG.isInfoEnabled()) { + LOG.info("process: vertexRangeExchangeChildrenChanged " + + "(ready to exchanged vertex ranges)"); + } + vertexRangeExchangeChildrenChanged.signal(); + eventProcessed = true; + } else if (event.getPath().contains( + VERTEX_RANGE_EXCHANGED_FINISHED_NODE) && + event.getType() == EventType.NodeCreated) { + if (LOG.isInfoEnabled()) { + LOG.info("process: vertexRangeExchangeFinishedChanged " + + "(vertex range exchange done)"); + } + vertexRangeExchangeFinishedChanged.signal(); + eventProcessed = true; + } else if (event.getPath().contains(SUPERSTEP_FINISHED_NODE) && + event.getType() == EventType.NodeCreated) { + if (LOG.isInfoEnabled()) { + LOG.info("process: superstepFinished signaled"); + } + superstepFinished.signal(); + eventProcessed = true; + } else if (event.getPath().endsWith(APPLICATION_ATTEMPTS_PATH) && + event.getType() == EventType.NodeChildrenChanged) { + if (LOG.isInfoEnabled()) { + LOG.info("process: applicationAttemptChanged signaled"); + } + applicationAttemptChanged.signal(); + eventProcessed = true; + } else if (event.getPath().contains(MASTER_ELECTION_DIR) && + event.getType() == EventType.NodeChildrenChanged) { + if (LOG.isInfoEnabled()) { + LOG.info("process: masterElectionChildrenChanged signaled"); + } + masterElectionChildrenChanged.signal(); + eventProcessed = true; + } else if (event.getPath().equals(CLEANED_UP_PATH) && + event.getType() == EventType.NodeChildrenChanged) { + if (LOG.isInfoEnabled()) { + LOG.info("process: cleanedUpChildrenChanged signaled"); + } + cleanedUpChildrenChanged.signal(); + eventProcessed = true; + } + + if ((processEvent(event) == false) && (eventProcessed == false)) { + LOG.warn("process: Unknown and unprocessed event (path=" + + event.getPath() + ", type=" + event.getType() + + ", state=" + event.getState() + ")"); + } + } +}
