Repository: giraph Updated Branches: refs/heads/trunk 345f3db49 -> b2d77411a
GIRAPH-1188 closes #70 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b2d77411 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b2d77411 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b2d77411 Branch: refs/heads/trunk Commit: b2d77411ae4a43ddd2386c4a1db5e0191790acf6 Parents: 345f3db Author: Yuksel Akinci <yuks...@fb.com> Authored: Mon May 14 22:45:14 2018 -0700 Committer: Dionysios Logothetis <dionys...@fb.com> Committed: Mon May 14 22:45:14 2018 -0700 ---------------------------------------------------------------------- .../no_vtx/MessagesWithoutVerticesTest.java | 17 + .../java/org/apache/giraph/bsp/BspService.java | 14 +- .../org/apache/giraph/conf/GiraphConstants.java | 10 + .../org/apache/giraph/writable/kryo/Boxed.java | 36 ++ .../writable/kryo/GiraphClassResolver.java | 371 +++++++++++++++++ .../apache/giraph/writable/kryo/HadoopKryo.java | 15 +- .../giraph/writable/kryo/KryoSimpleWrapper.java | 2 +- .../java/org/apache/giraph/zk/ZooKeeperExt.java | 4 +- .../PageRankWithKryoSimpleWritable.java | 393 +++++++++++++++++++ .../giraph/examples/TestKryoPageRank.java | 98 +++++ 10 files changed, 951 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/no_vtx/MessagesWithoutVerticesTest.java ---------------------------------------------------------------------- diff --git a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/no_vtx/MessagesWithoutVerticesTest.java b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/no_vtx/MessagesWithoutVerticesTest.java index bf3e194..dcdb002 100644 --- a/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/no_vtx/MessagesWithoutVerticesTest.java +++ b/giraph-block-app-8/src/test/java/org/apache/giraph/block_app/framework/no_vtx/MessagesWithoutVerticesTest.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.giraph.block_app.framework.no_vtx; import java.util.Iterator; http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java index c3fd141..632a1e6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java +++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java @@ -25,6 +25,7 @@ import org.apache.giraph.job.JobProgressTracker; import org.apache.giraph.partition.GraphPartitionerFactory; import org.apache.giraph.utils.CheckpointingUtils; import org.apache.giraph.worker.WorkerInfo; +import org.apache.giraph.writable.kryo.GiraphClassResolver; import org.apache.giraph.zk.BspEvent; import org.apache.giraph.zk.PredicateLock; import org.apache.giraph.zk.ZooKeeperExt; @@ -81,7 +82,9 @@ public abstract class BspService<I extends WritableComparable, /** Input splits all done node*/ public static final String INPUT_SPLITS_ALL_DONE_NODE = "/_inputSplitsAllDone"; - + /** Directory to store kryo className-ID assignment */ + public static final String KRYO_REGISTERED_CLASS_DIR = + "/_kryo"; /** Directory of attempts of this application */ public static final String APPLICATION_ATTEMPTS_DIR = "/_applicationAttemptsDir"; @@ -155,6 +158,8 @@ public abstract class BspService<I extends WritableComparable, protected final String haltComputationPath; /** Path where memory observer stores data */ protected final String memoryObserverPath; + /** Kryo className-ID mapping directory */ + protected final String kryoRegisteredClassPath; /** Private ZooKeeper instance that implements the service */ private final ZooKeeperExt zk; /** Has the Connection occurred? */ @@ -250,7 +255,7 @@ public abstract class BspService<I extends WritableComparable, inputSplitsAllDonePath = basePath + INPUT_SPLITS_ALL_DONE_NODE; applicationAttemptsPath = basePath + APPLICATION_ATTEMPTS_DIR; cleanedUpPath = basePath + CLEANED_UP_DIR; - + kryoRegisteredClassPath = basePath + KRYO_REGISTERED_CLASS_DIR; String restartJobId = RESTART_JOB_ID.get(conf); @@ -289,6 +294,11 @@ public abstract class BspService<I extends WritableComparable, throw new RuntimeException(e); } + boolean disableGiraphResolver = + GiraphConstants.DISABLE_GIRAPH_CLASS_RESOLVER.get(conf); + if (!disableGiraphResolver) { + GiraphClassResolver.setZookeeperInfo(zk, kryoRegisteredClassPath); + } this.taskId = (int) getApplicationAttempt() * conf.getMaxWorkers() + conf.getTaskPartition(); this.hostnameTaskId = hostname + "_" + getTaskId(); http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index db13670..4c02fff 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -1292,5 +1292,15 @@ public interface GiraphConstants { /** Number of supersteps job will run for */ IntConfOption SUPERSTEP_COUNT = new IntConfOption("giraph.numSupersteps", -1, "Number of supersteps job will run for"); + + /** Whether to disable GiraphClassResolver which is an efficient + * implementation of kryo class resolver. By default this resolver is used by + * KryoSimpleWritable and KryoSimpleWrapper, and can be disabled with this + * option */ + BooleanConfOption DISABLE_GIRAPH_CLASS_RESOLVER = + new BooleanConfOption("giraph.disableGiraphClassResolver", false, + "Disables GiraphClassResolver, which is a custom implementation " + + "of kryo class resolver that avoids writing class names to the " + + "underlying stream for faster serialization."); } // CHECKSTYLE: resume InterfaceIsTypeCheck http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-core/src/main/java/org/apache/giraph/writable/kryo/Boxed.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/Boxed.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/Boxed.java new file mode 100644 index 0000000..087f0dd --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/Boxed.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo; + +/** + * Boxed interface + * @param <T> + */ +public interface Boxed<T> { + /** + * Gets the boxed value. + * @return Boxed object. + */ + T get(); + + /** + * Sets the boxed value. + * @param value Value + */ + void set(T value); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-core/src/main/java/org/apache/giraph/writable/kryo/GiraphClassResolver.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/GiraphClassResolver.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/GiraphClassResolver.java new file mode 100644 index 0000000..80e7011 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/GiraphClassResolver.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.writable.kryo; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.KryoException; +import com.esotericsoftware.kryo.Registration; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.esotericsoftware.kryo.util.DefaultClassResolver; +import com.esotericsoftware.kryo.util.ObjectMap; +import org.apache.giraph.zk.ZooKeeperExt; +import org.apache.log4j.Logger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; +import java.util.HashMap; +import java.util.Map; +import java.util.List; + +import static com.esotericsoftware.kryo.util.Util.getWrapperClass; + +/** + * In order to avoid writing class names to the stream, this class resolver + * assigns unique integers to each class name and writes/reads those integers + * to/from the stream. Reads assume that there is already a class assigned + * to the given integer. This resolver only assigns unique integers for + * classes that are not explicitly registered since those classes are already + * assigned unique integers at the time of registration. This implementation + * uses zookeeper to provide consistent class name to ID mapping across all + + nodes. + * + * + * If resolver encounters a class name that has not been assigned to a unique + * integer yet, it creates a class node in zookeeper under a designated path + * with persistent_sequential mode - allowing the file name of the class node + * to be suffixed with an auto incremented integer. After the class node is + * created, the resolver reads back all the nodes under the designated path + * and uses the unique suffix as the class id. If there are duplicate entries + * for the same class name due to some race condition, the lowest suffix is + * used. + */ +public class GiraphClassResolver extends DefaultClassResolver { + /** Base ID to start for class name assignments. + * This number has to be high enough to not conflict with + * explicity registered class IDs. + * */ + private static final int BASE_CLASS_ID = 1000; + + /** Class logger */ + private static final Logger LOG = + Logger.getLogger(GiraphClassResolver.class); + + /** Class name to ID cache */ + private static Map<String, Integer> CLASS_NAME_TO_ID = new HashMap(); + /** ID to class name cache */ + private static Map<Integer, String> ID_TO_CLASS_NAME = new HashMap(); + /** Zookeeper */ + private static ZooKeeperExt ZK; + /** Zookeeper path for automatic class registrations */ + private static String KRYO_REGISTERED_CLASS_PATH; + /** Minimum class ID assigned by zookeeper sequencing */ + private static int MIN_CLASS_ID = -1; + /** True if the zookeeper class registration path is already created */ + private static boolean IS_CLASS_PATH_CREATED = false; + + /** Memoized class id*/ + private int memoizedClassId = -1; + /** Memoized class registration */ + private Registration memoizedClassIdValue; + + /** + * Sets zookeeper informaton. + * @param zookeeperExt ZookeeperExt + * @param kryoClassPath Zookeeper directory path where class Name-ID + * mapping is stored. + */ + public static void setZookeeperInfo(ZooKeeperExt zookeeperExt, + String kryoClassPath) { + ZK = zookeeperExt; + KRYO_REGISTERED_CLASS_PATH = kryoClassPath; + } + + /** + * Return true of the zookeeper is initialized. + * @return True if the zookeeper is initialized. + */ + public static boolean isInitialized() { + return ZK != null; + } + + /** + * Creates a new node for the given class name. + * Creation mode is persistent sequential, i.e. + * ZK will always create a new node . There could be + * multiple entries for the same class name but since + * the lowest index is used, this is not a problem. + * @param className Class name + */ + public static void createClassName(String className) { + try { + String path = KRYO_REGISTERED_CLASS_PATH + "/" + className; + ZK.createExt(path, + null, + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT_SEQUENTIAL, + true); + } catch (KeeperException e) { + throw new IllegalStateException( + "Failed to create class " + className, e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "Interrupted while creating " + className, e); + } + } + + /** + * Refreshes class-ID mapping from zookeeper. + * Not thread safe. + */ + public static void refreshCache() { + if (!IS_CLASS_PATH_CREATED) { + try { + ZK.createOnceExt(KRYO_REGISTERED_CLASS_PATH, + null, + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT, + true); + IS_CLASS_PATH_CREATED = true; + } catch (KeeperException e) { + throw new IllegalStateException( + "Failed to refresh kryo cache " + + KRYO_REGISTERED_CLASS_PATH, e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "Interrupted while refreshing kryo cache " + + KRYO_REGISTERED_CLASS_PATH, e); + } + } + + List<String> registeredList; + try { + registeredList = + ZK.getChildrenExt(KRYO_REGISTERED_CLASS_PATH, + false, + true, + false); + } catch (KeeperException e) { + throw new IllegalStateException( + "Failed to retrieve child nodes for " + KRYO_REGISTERED_CLASS_PATH, e); + } catch (InterruptedException e) { + throw new IllegalStateException( + "Interrupted while retrieving child nodes for " + + KRYO_REGISTERED_CLASS_PATH, e); + } + + for (String name : registeredList) { + // Since these files are created with PERSISTENT_SEQUENTIAL mode, + // Kryo appends a sequential number to their file name. + if (LOG.isDebugEnabled()) { + LOG.debug("Registered class: " + name); + } + String className = name.substring(0, + name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH); + int classId = Integer.parseInt( + name.substring(name.length() - ZooKeeperExt.SEQUENCE_NUMBER_LENGTH)); + + if (MIN_CLASS_ID == -1) { + MIN_CLASS_ID = classId; + } + + int adjustedId = classId - MIN_CLASS_ID + BASE_CLASS_ID; + if (CLASS_NAME_TO_ID.putIfAbsent(className, adjustedId) == null) { + ID_TO_CLASS_NAME.put(adjustedId, className); + } + } + } + + /** + * Gets ID for the given class name. + * @param className Class name + * @return class id Class ID + */ + public static int getClassId(String className) { + if (CLASS_NAME_TO_ID.containsKey(className)) { + return CLASS_NAME_TO_ID.get(className); + } + synchronized (GiraphClassResolver.class) { + if (CLASS_NAME_TO_ID.containsKey(className)) { + return CLASS_NAME_TO_ID.get(className); + } + refreshCache(); + + if (!CLASS_NAME_TO_ID.containsKey(className)) { + createClassName(className); + refreshCache(); + } + } + + if (!CLASS_NAME_TO_ID.containsKey(className)) { + throw new IllegalStateException("Failed to assigned id to " + className); + } + + return CLASS_NAME_TO_ID.get(className); + } + + /** + * Get class name for given ID. + * @param id class ID + * @return class name + */ + public static String getClassName(int id) { + if (ID_TO_CLASS_NAME.containsKey(id)) { + return ID_TO_CLASS_NAME.get(id); + } + synchronized (GiraphClassResolver.class) { + if (ID_TO_CLASS_NAME.containsKey(id)) { + return ID_TO_CLASS_NAME.get(id); + } + refreshCache(); + } + + if (!ID_TO_CLASS_NAME.containsKey(id)) { + throw new IllegalStateException("ID " + id + " doesn't exist"); + } + return ID_TO_CLASS_NAME.get(id); + } + + @Override + public Registration register(Registration registration) { + if (registration == null) { + throw new IllegalArgumentException("registration cannot be null"); + } + if (registration.getId() == NAME) { + throw new IllegalArgumentException("Invalid registration ID"); + } + + idToRegistration.put(registration.getId(), registration); + classToRegistration.put(registration.getType(), registration); + if (registration.getType().isPrimitive()) { + classToRegistration.put(getWrapperClass(registration.getType()), + registration); + } + return registration; + } + + @Override + public Registration registerImplicit(Class type) { + return register(new Registration(type, kryo.getDefaultSerializer(type), + getClassId(type.getName()))); + } + + @Override + public Registration writeClass(Output output, Class type) { + if (type == null) { + output.writeVarInt(Kryo.NULL, true); + return null; + } + + Registration registration = kryo.getRegistration(type); + if (registration.getId() == NAME) { + throw new IllegalStateException("Invalid registration ID"); + } else { + // Class ID's are incremented by 2 when writing, because 0 is used + // for null and 1 is used for non-explicitly registered classes. + output.writeVarInt(registration.getId() + 2, true); + } + return registration; + } + + @Override + public Registration readClass(Input input) { + int classID = input.readVarInt(true); + if (classID == Kryo.NULL) { + return null; + } else if (classID == NAME + 2) { + throw new IllegalStateException("Invalid class ID"); + } + if (classID == memoizedClassId) { + return memoizedClassIdValue; + } + Registration registration = idToRegistration.get(classID - 2); + if (registration == null) { + String className = getClassName(classID - 2); + Class type = getTypeByName(className); + if (type == null) { + try { + type = Class.forName(className, false, kryo.getClassLoader()); + } catch (ClassNotFoundException ex) { + throw new KryoException("Unable to find class: " + className, ex); + } + if (nameToClass == null) { + nameToClass = new ObjectMap(); + } + nameToClass.put(className, type); + } + registration = new Registration(type, kryo.getDefaultSerializer(type), + classID - 2); + register(registration); + } + memoizedClassId = classID; + memoizedClassIdValue = registration; + return registration; + } + + /** + * Reset the internal state + * Reset clears two hash tables: + * 1 - Class name to ID: Every non-explicitly registered class takes the + * ID agreed by all kryo instances, and it doesn't change across + * serializations, so this reset is not required. + * 2- Reference tracking: Not required because it is disabled. + * + * Therefore, this method should not be invoked. + * + */ + public void reset() { + throw new IllegalStateException("Not implemented"); + } + + /** + * This method writes the class name for the first encountered + * non-explicitly registered class. Since all non-explicitly registered + * classes take the ID agreed by all kryo instances, there is no need + * to write the class name, so this method should not be invoked. + * @param output Output stream + * @param type CLass type + * @param registration Registration + */ + @Override + protected void writeName(Output output, Class type, + Registration registration) { + throw new IllegalStateException("Not implemented"); + } + + /** + * This method reads the class name for the first encountered + * non-explicitly registered class. Since all non-explicitly registered + * classes take the ID agreed by all kryo instances, class name is + * never written, so this method should not be invoked. + * @param input Input stream + * @return Registration + */ + @Override + protected Registration readName(Input input) { + throw new IllegalStateException("Not implemented"); + } + + /** + * Get type by class name. + * @param className Class name + * @return class type + */ + protected Class<?> getTypeByName(final String className) { + return nameToClass != null ? nameToClass.get(className) : null; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java index fb1186b..2713316 100644 --- a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/HadoopKryo.java @@ -308,8 +308,12 @@ public class HadoopKryo extends Kryo { if (trackReferences) { kryo = new HadoopKryo(); } else { - // TODO: if trackReferences is false use custom class resolver. - kryo = new HadoopKryo(new DefaultClassResolver(), + // Only use GiraphClassResolver if it is properly initialized. + // This is to enable test cases which use KryoSimpleWrapper + // but don't start ZK. + kryo = new HadoopKryo( + GiraphClassResolver.isInitialized() ? new GiraphClassResolver() : + new DefaultClassResolver(), new MapReferenceResolver()); } @@ -406,8 +410,11 @@ public class HadoopKryo extends Kryo { if (!trackReferences) { kryo.setReferences(false); - // TODO: Enable the following when a custom class resolver is created. - // kryo.setAutoReset(false); + // Auto reset can only be disabled if the GiraphClassResolver is + // properly initialized. + if (GiraphClassResolver.isInitialized()) { + kryo.setAutoReset(false); + } } return kryo; } http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWrapper.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWrapper.java index 9c5de74..3cb291d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWrapper.java +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoSimpleWrapper.java @@ -39,7 +39,7 @@ import org.apache.hadoop.io.Writable; * * @param <T> Object type */ -public class KryoSimpleWrapper<T> implements Writable { +public class KryoSimpleWrapper<T> implements Writable, Boxed<T> { /** Wrapped object */ private T object; http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java index 1eb4c8b..a20c494 100644 --- a/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java +++ b/giraph-core/src/main/java/org/apache/giraph/zk/ZooKeeperExt.java @@ -42,10 +42,10 @@ import org.apache.zookeeper.ZooKeeper; * should be thread-safe. */ public class ZooKeeperExt { + /** Length of the ZK sequence number */ + public static final int SEQUENCE_NUMBER_LENGTH = 10; /** Internal logger */ private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class); - /** Length of the ZK sequence number */ - private static final int SEQUENCE_NUMBER_LENGTH = 10; /** Internal ZooKeeper */ private final ZooKeeper zooKeeper; /** Ensure we have progress */ http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankWithKryoSimpleWritable.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankWithKryoSimpleWritable.java b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankWithKryoSimpleWritable.java new file mode 100644 index 0000000..4e9e1ed --- /dev/null +++ b/giraph-examples/src/main/java/org/apache/giraph/examples/PageRankWithKryoSimpleWritable.java @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.giraph.examples; + +import com.google.common.collect.Lists; +import org.apache.giraph.aggregators.BasicAggregator; +import org.apache.giraph.aggregators.LongSumAggregator; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.EdgeFactory; +import org.apache.giraph.graph.BasicComputation; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.io.VertexReader; +import org.apache.giraph.io.formats.GeneratedVertexInputFormat; +import org.apache.giraph.master.DefaultMasterCompute; +import org.apache.giraph.worker.WorkerContext; +import org.apache.giraph.writable.kryo.KryoSimpleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.VertexValue; +import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.MessageValue; +import org.apache.giraph.examples.PageRankWithKryoSimpleWritable.EdgeValue; + +/** + * Copy of SimplePageRank, modified to test vertex/edge and + * message values that derives from KryoSimpleWritable. + */ +@Algorithm( + name = "Page rank" +) +public class PageRankWithKryoSimpleWritable extends + BasicComputation<LongWritable, VertexValue, + EdgeValue, MessageValue> { + /** Number of supersteps for this test */ + public static final int MAX_SUPERSTEPS = 30; + /** Number of supersteps for this static 3; + /** Logger */ + private static final Logger LOG = + Logger.getLogger(PageRankWithKryoSimpleWritable.class); + /** Sum aggregator name */ + private static String SUM_AGG = "sum"; + /** Min aggregator name */ + private static String MIN_AGG = "min"; + /** Max aggregator name */ + private static String MAX_AGG = "max"; + + @Override + public void compute( + Vertex<LongWritable, VertexValue, + EdgeValue> vertex, + Iterable<MessageValue> messages) throws IOException { + if (getSuperstep() >= 1) { + double sum = 0; + for (MessageValue message : messages) { + sum += message.get(); + } + Double value = (0.15f / getTotalNumVertices()) + 0.85f * sum; + VertexValue vertexValue = new VertexValue(value); + vertex.setValue(vertexValue); + aggregate(MAX_AGG, vertexValue); + aggregate(MIN_AGG, vertexValue); + aggregate(SUM_AGG, new LongWritable(1)); + LOG.info(vertex.getId() + ": PageRank=" + vertexValue + + " max=" + getAggregatedValue(MAX_AGG) + + " min=" + getAggregatedValue(MIN_AGG)); + } + + if (getSuperstep() < MAX_SUPERSTEPS) { + long edges = vertex.getNumEdges(); + sendMessageToAllEdges(vertex, + new MessageValue(vertex.getValue().get() / edges)); + } else { + vertex.voteToHalt(); + } + } + + /** + * Worker context used with {@link PageRankWithKryoSimpleWritable}. + */ + public static class PageRankWithKryoWorkerContext extends + WorkerContext { + /** Final max value for verification for local jobs */ + private static double FINAL_MAX; + /** Final min value for verification for local jobs */ + private static double FINAL_MIN; + /** Final sum value for verification for local jobs */ + private static long FINAL_SUM; + + public static double getFinalMax() { + return FINAL_MAX; + } + + public static double getFinalMin() { + return FINAL_MIN; + } + + public static long getFinalSum() { + return FINAL_SUM; + } + + @Override + public void preApplication() + throws InstantiationException, IllegalAccessException { + } + + @Override + public void postApplication() { + FINAL_SUM = this.<LongWritable>getAggregatedValue(SUM_AGG).get(); + FINAL_MAX = this.<VertexValue>getAggregatedValue(MAX_AGG).get(); + FINAL_MIN = this.<VertexValue>getAggregatedValue(MIN_AGG).get(); + + LOG.info("aggregatedNumVertices=" + FINAL_SUM); + LOG.info("aggregatedMaxPageRank=" + FINAL_MAX); + LOG.info("aggregatedMinPageRank=" + FINAL_MIN); + } + + @Override + public void preSuperstep() { + if (getSuperstep() >= 3) { + LOG.info("aggregatedNumVertices=" + + getAggregatedValue(SUM_AGG) + + " NumVertices=" + getTotalNumVertices()); + if (this.<LongWritable>getAggregatedValue(SUM_AGG).get() != + getTotalNumVertices()) { + throw new RuntimeException("wrong value of SumAggreg: " + + getAggregatedValue(SUM_AGG) + ", should be: " + + getTotalNumVertices()); + } + VertexValue maxPagerank = getAggregatedValue(MAX_AGG); + LOG.info("aggregatedMaxPageRank=" + maxPagerank.get()); + VertexValue minPagerank = getAggregatedValue(MIN_AGG); + LOG.info("aggregatedMinPageRank=" + minPagerank.get()); + } + } + + @Override + public void postSuperstep() { } + } + + /** + * Master compute associated with {@link PageRankWithKryoSimpleWritable}. + * It registers required aggregators. + */ + public static class PageRankWithKryoMasterCompute extends + DefaultMasterCompute { + @Override + public void initialize() throws InstantiationException, + IllegalAccessException { + registerAggregator(SUM_AGG, LongSumAggregator.class); + registerPersistentAggregator(MIN_AGG, DoubleMinWrapperAggregator.class); + registerPersistentAggregator(MAX_AGG, DoubleMaxWrapperAggregator.class); + } + } + + /** + * Simple VertexReader that supports {@link PageRankWithKryoSimpleWritable} + */ + public static class PageRankWithKryoVertexReader extends + GeneratedVertexReader<LongWritable, VertexValue, EdgeValue> { + /** Class logger */ + private static final Logger LOG = + Logger.getLogger( + PageRankWithKryoSimpleWritable.PageRankWithKryoVertexReader.class); + + @Override + public boolean nextVertex() { + return totalRecords > recordsRead; + } + + @Override + public Vertex<LongWritable, VertexValue, EdgeValue> + getCurrentVertex() throws IOException { + Vertex<LongWritable, VertexValue, EdgeValue> vertex = + getConf().createVertex(); + LongWritable vertexId = new LongWritable( + (inputSplit.getSplitIndex() * totalRecords) + recordsRead); + VertexValue vertexValue = new VertexValue(vertexId.get() * 10d); + long targetVertexId = + (vertexId.get() + 1) % + (inputSplit.getNumSplits() * totalRecords); + float edgeValue = vertexId.get() * 100f; + List<Edge<LongWritable, EdgeValue>> edges = Lists.newLinkedList(); + edges.add(EdgeFactory.create(new LongWritable(targetVertexId), + new EdgeValue(edgeValue))); + vertex.initialize(vertexId, vertexValue, edges); + ++recordsRead; + if (LOG.isInfoEnabled()) { + LOG.info("next: Return vertexId=" + vertex.getId().get() + + ", vertexValue=" + vertex.getValue() + + ", targetVertexId=" + targetVertexId + ", edgeValue=" + edgeValue); + } + return vertex; + } + } + + /** + * VertexInputFormat that supports {@link PageRankWithKryoSimpleWritable} + */ + public static class PageRankWithKryoVertexInputFormat extends + GeneratedVertexInputFormat<LongWritable, VertexValue, EdgeValue> { + @Override + public VertexReader<LongWritable, VertexValue, + EdgeValue> createVertexReader(InputSplit split, + TaskAttemptContext context) + throws IOException { + return new PageRankWithKryoVertexReader(); + } + } + + /** + * Creating a custom vertex value class to force kryo to + * register with a new ID. Please note that a custom + * class containing a double array is not + * necessary for the page rank application. It is only + * used for testing the scenario of kryo encountering an + * unregistered custom class. + */ + public static class VertexValue extends KryoSimpleWritable { + /** Storing the value in an array. + Double array is an unregistered type + hence kryo will assign a unique class id */ + private double[] ranks; + + /** Constructor */ + public VertexValue() { + } + + /** + * Constructor + * @param val Vertex value + */ + public VertexValue(Double val) { + ranks = new double[1]; + + ranks[0] = val; + } + + /** + * Get vertex value + * @return Vertex value + */ + public Double get() { + return ranks[0]; + } + + /** + * Set vertex value. + * @param val Vertex value + */ + public void set(Double val) { + this.ranks[0] = val; + } + } + + /** + * Creating a custom edge value class to force kryo to + * register with a new ID. Please note that a custom + * class containing a float is not + * necessary for the page rank application. It is only + * used for testing the scenario of kryo encountering an + * unregistered custom class. + */ + public static class EdgeValue extends KryoSimpleWritable { + /** Edge value */ + private Float realValue; + + /** Constructor */ + public EdgeValue() { + } + /** + * Constructor + * @param val Edge value + */ + public EdgeValue(Float val) { + realValue = val; + } + + /** + * Get edge value + * @return Edge value + */ + public Float get() { + return realValue; + } + + /** + * Set edge value + * @param val Edge value + */ + public void set(Float val) { + this.realValue = val; + } + } + + /** + * Creating a custom message value class to force kryo to + * register with a new ID. Please note that a custom + * class containing a double list is not + * necessary for the page rank application. It is only + * used for testing the scenario of kryo encountering an + * unregistered custom class. + */ + public static class MessageValue extends KryoSimpleWritable { + /** Storing the message in a list to test the list type */ + private List<Double> msgValue; + + /** Constructor */ + public MessageValue() { + } + + /** + * Constructor + * @param val Message value + */ + public MessageValue(Double val) { + msgValue = new ArrayList<>(); + msgValue.add(val); + } + + /** + * Get message value + * @return Message value + */ + public Double get() { + return msgValue.get(0); + } + + /** + * Set message value + * @param val Message value + */ + public void set(Double val) { + this.msgValue.set(0, val); + } + } + + + /** + * Aggregator for getting max double value + */ + public static class DoubleMaxWrapperAggregator extends + BasicAggregator<VertexValue> { + @Override + public void aggregate(VertexValue value) { + getAggregatedValue().set( + Math.max(getAggregatedValue().get(), value.get())); + } + + @Override + public VertexValue createInitialValue() { + return new VertexValue(Double.NEGATIVE_INFINITY); + } + } + + /** + * Aggregator for getting min double value. + */ + public static class DoubleMinWrapperAggregator + extends BasicAggregator<VertexValue> { + @Override + public void aggregate(VertexValue value) { + getAggregatedValue().set( + Math.min(getAggregatedValue().get(), value.get())); + } + + @Override + public VertexValue createInitialValue() { + return new VertexValue(Double.MAX_VALUE); + } + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/b2d77411/giraph-examples/src/test/java/org/apache/giraph/examples/TestKryoPageRank.java ---------------------------------------------------------------------- diff --git a/giraph-examples/src/test/java/org/apache/giraph/examples/TestKryoPageRank.java b/giraph-examples/src/test/java/org/apache/giraph/examples/TestKryoPageRank.java new file mode 100644 index 0000000..69f5a83 --- /dev/null +++ b/giraph-examples/src/test/java/org/apache/giraph/examples/TestKryoPageRank.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.examples; + +import org.apache.giraph.BspCase; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.job.GiraphJob; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * Test page rank with kryo wrapper + */ +public class TestKryoPageRank extends BspCase { + + /** + * Constructor + */ + public TestKryoPageRank() { + super(TestPageRank.class.getName()); + } + + @Test + public void testKryoPageRank() + throws ClassNotFoundException, IOException, InterruptedException { + testPageRankWithKryoWrapper(1); + } + + @Test + public void testKryoPageRankTenThreadsCompute() + throws ClassNotFoundException, IOException, InterruptedException { + testPageRankWithKryoWrapper(10); + } + + + /** + * Testing simple page rank by wrapping vertex value, edge + * and message values with kryo wrapper. + * + * @param numComputeThreads Number of compute threads to use + * @throws java.io.IOException + * @throws ClassNotFoundException + * @throws InterruptedException + */ + private void testPageRankWithKryoWrapper(int numComputeThreads) + throws IOException, InterruptedException, ClassNotFoundException { + GiraphConfiguration conf = new GiraphConfiguration(); + conf.setComputationClass(PageRankWithKryoSimpleWritable.class); + conf.setVertexInputFormatClass( + PageRankWithKryoSimpleWritable.PageRankWithKryoVertexInputFormat.class); + conf.setWorkerContextClass( + PageRankWithKryoSimpleWritable.PageRankWithKryoWorkerContext.class); + conf.setMasterComputeClass( + PageRankWithKryoSimpleWritable.PageRankWithKryoMasterCompute.class); + conf.setNumComputeThreads(numComputeThreads); + // Set enough partitions to generate randomness on the compute side + if (numComputeThreads != 1) { + GiraphConstants.USER_PARTITION_COUNT.set(conf, numComputeThreads * 5); + } + GiraphJob job = prepareJob(getCallingMethodName(), conf); + assertTrue(job.run(true)); + if (!runningInDistributedMode()) { + double maxPageRank = + PageRankWithKryoSimpleWritable.PageRankWithKryoWorkerContext.getFinalMax(); + double minPageRank = + PageRankWithKryoSimpleWritable.PageRankWithKryoWorkerContext.getFinalMin(); + long numVertices = + PageRankWithKryoSimpleWritable.PageRankWithKryoWorkerContext.getFinalSum(); + System.out.println(getCallingMethodName() + ": maxPageRank=" + + maxPageRank + " minPageRank=" + + minPageRank + " numVertices=" + numVertices + ", " + + " numComputeThreads=" + numComputeThreads); + assertEquals(34.03, maxPageRank, 0.001); + assertEquals(0.03, minPageRank, 0.00001); + assertEquals(5L, numVertices); + } + } +}