Repository: giraph Updated Branches: refs/heads/trunk ad27a2914 -> 79e7f1c98
[GIRAPH-1013] Add local (single machine) implementation Summary: This allows you to run application written in Blocks Framework very efficiently on single machine. Specifically this is interesting for having fast unit tests. Test Plan: mvn clean install -Phadoop_facebook Making TargetVertexIdIterator public is in addition to just adding classes to open source Reviewers: maja.kabiljo, dionysis.logothetis, sergey.edunov Reviewed By: sergey.edunov Differential Revision: https://reviews.facebook.net/D39717 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/79e7f1c9 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/79e7f1c9 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/79e7f1c9 Branch: refs/heads/trunk Commit: 79e7f1c98575a473d12022e198679614b1fe9029 Parents: ad27a29 Author: Igor Kabiljo <[email protected]> Authored: Mon Jun 8 11:48:28 2015 -0700 Committer: Igor Kabiljo <[email protected]> Committed: Fri Jun 12 20:34:16 2015 -0700 ---------------------------------------------------------------------- .../api/local/InternalAggregators.java | 142 ++++++ .../framework/api/local/InternalApi.java | 432 +++++++++++++++++++ .../api/local/InternalMessageStore.java | 423 ++++++++++++++++++ .../framework/api/local/LocalBlockRunner.java | 247 +++++++++++ .../framework/api/local/VertexSaver.java | 34 ++ .../framework/api/local/package-info.java | 26 ++ .../apache/giraph/comm/SendMessageCache.java | 7 +- .../writable/kryo/KryoWritableWrapper.java | 13 + 8 files changed, 1321 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalAggregators.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalAggregators.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalAggregators.java new file mode 100644 index 0000000..dbcc9f1 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalAggregators.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api.local; + +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.reducers.Reducer; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.worker.WorkerGlobalCommUsage; +import org.apache.hadoop.io.Writable; + +import com.google.common.collect.Maps; + +/** + * Internal aggregators implementation + */ +@SuppressWarnings("unchecked") +class InternalAggregators + implements MasterGlobalCommUsage, WorkerGlobalCommUsage { + private final boolean runAllChecks; + + /** Map of reducers registered for the next worker computation */ + private final Map<String, Reducer<Object, Writable>> reducerMap = + Maps.newHashMap(); + /** Map of values to be sent to workers for next computation */ + private final Map<String, Writable> broadcastMap = + Maps.newHashMap(); + /** Values reduced from previous computation */ + private final Map<String, Writable> reducedMap = + Maps.newHashMap(); + + public InternalAggregators(boolean runAllChecks) { + this.runAllChecks = runAllChecks; + } + + private static <T> T getOrThrow( + Map<String, T> map, String mapName, String key) { + T value = map.get(key); + if (value == null) { + throw new IllegalArgumentException( + key + " not present in " + mapName); + } + return value; + } + + @Override + public void broadcast(String name, Writable value) { + broadcastMap.put(name, value); + } + + @Override + public <B extends Writable> B getBroadcast(String name) { + return (B) getOrThrow(broadcastMap, "broadcastMap", name); + } + + @Override + public <S, R extends Writable> void registerReducer( + String name, ReduceOperation<S, R> reduceOp) { + registerReducer(name, reduceOp, reduceOp.createInitialValue()); + } + + @Override + public <S, R extends Writable> void registerReducer( + String name, ReduceOperation<S, R> reduceOp, + R globalInitialValue) { + if (reducerMap.containsKey(name)) { + throw new IllegalArgumentException( + "Reducer with name " + name + " was already registered, " + + " and is " + reducerMap.get(name).getReduceOp() + + ", and we are trying to " + " register " + reduceOp); + } + if (reduceOp == null) { + throw new IllegalArgumentException( + "null reducer cannot be registered, with name " + name); + } + if (globalInitialValue == null) { + throw new IllegalArgumentException( + "global initial value for reducer cannot be null, but is for " + + reduceOp + " with naem" + name); + } + + Reducer<S, R> reducer = new Reducer<>(reduceOp, globalInitialValue); + reducerMap.put(name, (Reducer<Object, Writable>) reducer); + } + + @Override + public void reduce(String name, Object value) { + Reducer<Object, Writable> reducer = + getOrThrow(reducerMap, "reducerMap", name); + synchronized (reducer) { + reducer.reduce(value); + } + } + + @Override + public void reduceMerge(String name, Writable value) { + Reducer<Object, Writable> reducer = + getOrThrow(reducerMap, "reducerMap", name); + synchronized (reducer) { + reducer.reduceMerge(value); + } + } + + @Override + public <R extends Writable> R getReduced(String name) { + return (R) getOrThrow(reducedMap, "reducedMap", name); + } + + public synchronized void afterWorkerBeforeMaster() { + broadcastMap.clear(); + reducedMap.clear(); + for (Entry<String, Reducer<Object, Writable>> entry : + reducerMap.entrySet()) { + Writable value = entry.getValue().getCurrentValue(); + if (runAllChecks) { + Writable newValue = entry.getValue().createInitialValue(); + WritableUtils.copyInto(value, newValue); + value = newValue; + } + reducedMap.put(entry.getKey(), value); + } + reducerMap.clear(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java new file mode 100644 index 0000000..99e9e24 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalApi.java @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api.local; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor; +import org.apache.giraph.block_app.framework.api.Counter; +import org.apache.giraph.block_app.framework.api.local.InternalMessageStore.InternalConcurrentMessageStore; +import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic; +import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces; +import org.apache.giraph.block_app.framework.output.BlockOutputDesc; +import org.apache.giraph.block_app.framework.output.BlockOutputHandle; +import org.apache.giraph.block_app.framework.output.BlockOutputWriter; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler.BroadcastHandleImpl; +import org.apache.giraph.comm.SendMessageCache.TargetVertexIdIterator; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.OutEdges; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.graph.VertexMutations; +import org.apache.giraph.graph.VertexResolver; +import org.apache.giraph.master.AggregatorToGlobalCommTranslation; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.utils.TestGraph; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.worker.WorkerAggregatorDelegator; +import org.apache.giraph.worker.WorkerGlobalCommUsage; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.base.Preconditions; + +/** + * Internal implementation of Block API interfaces - representing an in-memory + * giraph instance. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +class InternalApi<I extends WritableComparable, V extends Writable, + E extends Writable> implements BlockMasterApi, BlockOutputHandleAccessor { + private final TestGraph<I, V, E> graph; + private final ImmutableClassesGiraphConfiguration conf; + private final boolean runAllChecks; + private final InternalAggregators globalComm; + private final AggregatorToGlobalCommTranslation aggregators; + + private final boolean createVertexOnMsgs; + private final ConcurrentHashMap<I, VertexMutations<I, V, E>> mutations; + + private InternalMessageStore previousMessages; + private InternalMessageStore nextMessages; + + private final InternalWorkerApi workerApi; + private final BlockWorkerContextLogic workerContextLogic; + private List<Writable> previousWorkerMessages; + private List<Writable> nextWorkerMessages; + + public InternalApi( + TestGraph<I, V, E> graph, + ImmutableClassesGiraphConfiguration conf, + boolean runAllChecks) { + this.graph = graph; + this.conf = conf; + this.runAllChecks = runAllChecks; + this.globalComm = new InternalAggregators(runAllChecks); + this.aggregators = new AggregatorToGlobalCommTranslation(conf, globalComm); + this.mutations = new ConcurrentHashMap<>(); + this.workerApi = new InternalWorkerApi(); + this.workerApi.setConf(conf); + this.workerApi.setWorkerGlobalCommUsage(this.globalComm); + + this.createVertexOnMsgs = + GiraphConstants.RESOLVER_CREATE_VERTEX_ON_MSGS.get(conf); + workerContextLogic = new BlockWorkerContextLogic(); + } + + /** + * Wrapper for calling Worker API interface. + * Needs to be separate from Master API, since getAggregatedValue + * has different implementation on worker and on master. + */ + class InternalWorkerApi extends WorkerAggregatorDelegator<I, V, E> + implements BlockWorkerSendApi<I, V, E, Writable>, + BlockWorkerReceiveApi<I>, BlockWorkerContextSendApi<Writable>, + BlockWorkerContextReceiveApi, BlockWorkerValueAccessor, + WorkerGlobalCommUsage { + + @Override + public void addVertexRequest(I id, V value) { + addVertexRequest(id, value, conf.createAndInitializeOutEdges()); + } + + @Override + public void addVertexRequest(I id, V value, OutEdges<I, E> edges) { + Vertex<I, V, E> vertex = conf.createVertex(); + vertex.initialize(id, value, edges); + getMutationFor(id).addVertex(vertex); + } + + @Override + public void removeVertexRequest(I vertexId) { + getMutationFor(vertexId).removeVertex(); + } + + @Override + public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) { + getMutationFor(sourceVertexId).addEdge(edge); + } + + @Override + public void removeEdgesRequest(I sourceVertexId, I targetVertexId) { + getMutationFor(sourceVertexId).removeEdge(targetVertexId); + } + + @Override + public void sendMessage(I id, Writable message) { + nextMessages.sendMessage(id, message); + } + + @Override + public void sendMessageToAllEdges( + Vertex<I, V, E> vertex, Writable message) { + sendMessageToMultipleEdges( + new TargetVertexIdIterator<>(vertex), + message); + } + + @Override + public void sendMessageToMultipleEdges( + Iterator<I> vertexIdIterator, Writable message) { + nextMessages.sendMessageToMultipleEdges(vertexIdIterator, message); + } + + @Override + public int getMyWorkerIndex() { + return 0; + } + + @Override + public int getWorkerCount() { + return 1; + } + + @Override + public void sendMessageToWorker(Writable message, int workerIndex) { + Preconditions.checkArgument(workerIndex == getMyWorkerIndex(), + "With just one worker you can only send worker message to itself, " + + "but tried to send to " + workerIndex); + nextWorkerMessages.add(message); + } + + @Override + public Object getWorkerValue() { + return workerContextLogic.getWorkerValue(); + } + + @Override + public long getTotalNumVertices() { + return InternalApi.this.getTotalNumVertices(); + } + + @Override + public long getTotalNumEdges() { + return InternalApi.this.getTotalNumEdges(); + } + + @Override + public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>> + OD getOutputDesc(String confOption) { + return workerContextLogic.getOutputHandle().<OW, OD>getOutputDesc( + confOption); + } + + @Override + public <OW extends BlockOutputWriter> OW getWriter(String confOption) { + return workerContextLogic.getOutputHandle().getWriter(confOption); + } + } + + @Override + public void broadcast(String name, Writable value) { + globalComm.broadcast(name, value); + } + + @Override + public <T extends Writable> BroadcastHandle<T> broadcast(T object) { + BroadcastHandleImpl<T> handle = new BroadcastHandleImpl<>(); + broadcast(handle.getName(), object); + return handle; + } + + @Override + public <S, R extends Writable> void registerReducer( + String name, ReduceOperation<S, R> reduceOp) { + globalComm.registerReducer(name, reduceOp); + } + + @Override + public <S, R extends Writable> void registerReducer( + String name, ReduceOperation<S, R> reduceOp, + R globalInitialValue) { + globalComm.registerReducer(name, reduceOp, globalInitialValue); + } + + @Override + public <R extends Writable> R getReduced(String name) { + return globalComm.getReduced(name); + } + + @Override + public <A extends Writable> A getAggregatedValue(String name) { + return aggregators.getAggregatedValue(name); + } + + @Override + public <A extends Writable> void setAggregatedValue(String name, A value) { + aggregators.setAggregatedValue(name, value); + } + + @Override + public <A extends Writable> + boolean registerAggregator( + String name, Class<? extends Aggregator<A>> aggregatorClass) + throws InstantiationException, IllegalAccessException { + return aggregators.registerAggregator(name, aggregatorClass); + } + + @Override + public <A extends Writable> + boolean registerPersistentAggregator( + String name, Class<? extends Aggregator<A>> aggregatorClass) + throws InstantiationException, IllegalAccessException { + return aggregators.registerPersistentAggregator(name, aggregatorClass); + } + + @Override + public ImmutableClassesGiraphConfiguration<I, V, E> getConf() { + return conf; + } + + @Override + public void setStatus(String status) { + } + + @Override + public void progress() { + } + + @Override + public Counter getCounter(final String group, final String name) { + return new Counter() { + @Override + public void increment(long incr) { + } + @Override + public void setValue(long value) { + } + }; + } + + private VertexMutations<I, V, E> getMutationFor(I vertexId) { + VertexMutations<I, V, E> curMutations = new VertexMutations<>(); + VertexMutations<I, V, E> prevMutations = + mutations.putIfAbsent(vertexId, curMutations); + if (prevMutations != null) { + curMutations = prevMutations; + } + return curMutations; + } + + public Iterable takeMessages(I id) { + if (previousMessages != null) { + Iterable result = previousMessages.takeMessages(id); + if (result != null) { + return result; + } + } + return Collections.emptyList(); + } + + public List<Writable> takeWorkerMessages() { + if (previousWorkerMessages != null) { + List<Writable> ret = new ArrayList<>(previousWorkerMessages.size()); + for (Writable message : previousWorkerMessages) { + // Use message copies probabilistically, to catch both not serializing + // some fields, and storing references from message object itself + // (which can be reusable). + ret.add(runAllChecks && ThreadLocalRandom.current().nextBoolean() ? + WritableUtils.createCopy(message) : message); + } + previousWorkerMessages = null; + if (runAllChecks) { + Collections.shuffle(ret); + } + return ret; + } + return Collections.emptyList(); + } + + public void afterWorkerBeforeMaster() { + globalComm.afterWorkerBeforeMaster(); + aggregators.prepareSuperstep(); + } + + public void afterMasterBeforeWorker() { + aggregators.postMasterCompute(); + } + + public void afterMasterBeforeWorker(BlockWorkerPieces computation) { + afterMasterBeforeWorker(); + + previousMessages = nextMessages; + previousWorkerMessages = nextWorkerMessages; + + nextMessages = InternalConcurrentMessageStore.createMessageStore( + conf, computation, runAllChecks); + nextWorkerMessages = new ArrayList<>(); + + // process mutations: + Set<I> targets = previousMessages == null ? + Collections.EMPTY_SET : previousMessages.targetsSet(); + if (createVertexOnMsgs) { + for (I target : targets) { + if (!graph.getVertices().containsKey(target)) { + mutations.put(target, new VertexMutations<I, V, E>()); + } + } + } + + VertexResolver<I, V, E> vertexResolver = conf.createVertexResolver(); + for (Map.Entry<I, VertexMutations<I, V, E>> entry : mutations.entrySet()) { + I vertexIndex = entry.getKey(); + Vertex<I, V, E> originalVertex = graph.getVertex(vertexIndex); + VertexMutations<I, V, E> curMutations = entry.getValue(); + Vertex<I, V, E> vertex = vertexResolver.resolve( + vertexIndex, originalVertex, curMutations, + targets.contains(vertexIndex)); + + if (vertex != null) { + graph.addVertex(vertex); + } else if (originalVertex != null) { + graph.getVertices().remove(originalVertex.getId()); + } + } + mutations.clear(); + } + + public Collection<Vertex<I, V, E>> getAllVertices() { + return graph.getVertices().values(); + } + + public InternalWorkerApi getWorkerApi() { + return workerApi; + } + + @Override + public long getTotalNumEdges() { + int numEdges = 0; + for (Vertex<I, V, E> vertex : graph.getVertices().values()) { + numEdges += vertex.getNumEdges(); + } + return numEdges; + } + + @Override + public long getTotalNumVertices() { + return graph.getVertices().size(); + } + + @Override + public void logToCommandLine(String line) { + System.err.println("Command line: " + line); + } + + @Override + public BlockOutputHandle getBlockOutputHandle() { + return workerContextLogic.getOutputHandle(); + } + + @Override + public <OW extends BlockOutputWriter, + OD extends BlockOutputDesc<OW>> OD getOutputDesc(String confOption) { + return workerContextLogic.getOutputHandle().<OW, OD>getOutputDesc( + confOption); + } + + @Override + public <OW extends BlockOutputWriter> OW getWriter(String confOption) { + return workerContextLogic.getOutputHandle().getWriter(confOption); + } + + + public BlockWorkerContextLogic getWorkerContextLogic() { + return workerContextLogic; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java new file mode 100644 index 0000000..6c0cccb --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/InternalMessageStore.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api.local; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces; +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.MessageClasses; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.types.ops.TypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.giraph.utils.ExtendedByteArrayDataInput; +import org.apache.giraph.utils.ExtendedByteArrayDataOutput; +import org.apache.giraph.utils.UnsafeReusableByteArrayInput; +import org.apache.giraph.utils.WritableUtils; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.collect.AbstractIterator; + +/** + * Interface for internal message store, used by LocalBlockRunner + * + * @param <I> Vertex id type + * @param <M> Message type + */ +@SuppressWarnings("rawtypes") +interface InternalMessageStore + <I extends WritableComparable, M extends Writable> { + Set<I> targetsSet(); + Iterable<M> takeMessages(I id); + void sendMessage(I id, M message); + void sendMessageToMultipleEdges(Iterator<I> idIter, M message); + + /** + * Abstract Internal message store implementation that uses + * ConcurrentHashMap to store objects received thus far. + * + * @param <I> Vertex id type + * @param <M> Message type + * @param <R> Receiver object that particular implementation uses + * (message, array of messages, byte array, etc) + */ + abstract class InternalConcurrentMessageStore + <I extends WritableComparable, M extends Writable, R> + implements InternalMessageStore<I, M> { + protected final ConcurrentHashMap<I, R> received = + new ConcurrentHashMap<>(); + + private final Class<I> idClass; + private final TypeOps<I> idTypeOps; + + InternalConcurrentMessageStore(Class<I> idClass) { + this.idClass = idClass; + idTypeOps = TypeOpsUtils.getTypeOpsOrNull(idClass); + } + + public I copyId(I id) { + if (idTypeOps != null) { + return idTypeOps.createCopy(id); + } else { + return WritableUtils.createCopy(id, idClass, null); + } + } + + R getReceiverFor(I id) { + R value = received.get(id); + + if (value == null) { + id = copyId(id); + value = createNewReceiver(); + R oldValue = received.putIfAbsent(id, value); + if (oldValue != null) { + value = oldValue; + } + } + return value; + } + + abstract R createNewReceiver(); + + @Override + public Set<I> targetsSet() { + return received.keySet(); + } + + @Override + public void sendMessageToMultipleEdges(Iterator<I> idIter, M message) { + while (idIter.hasNext()) { + sendMessage(idIter.next(), message); + } + } + + public static <I extends WritableComparable, M extends Writable> + InternalMessageStore<I, M> createMessageStore( + final ImmutableClassesGiraphConfiguration<I, ?, ?> conf, + final MessageClasses<I, M> messageClasses) { + MessageCombiner<? super I, M> combiner = + messageClasses.createMessageCombiner(conf); + if (combiner != null) { + return new InternalCombinerMessageStore<>( + conf.getVertexIdClass(), combiner); + } else if (messageClasses.getMessageEncodeAndStoreType().equals( + MessageEncodeAndStoreType.POINTER_LIST_PER_VERTEX)) { + return new InternalSharedByteMessageStore<>( + conf.getVertexIdClass(), + messageClasses.createMessageValueFactory(conf)); + } else { + return new InternalByteMessageStore<>( + conf.getVertexIdClass(), + messageClasses.createMessageValueFactory(conf)); + } + } + + public static <I extends WritableComparable, M extends Writable> + InternalMessageStore<I, M> createMessageStore( + final ImmutableClassesGiraphConfiguration<I, ?, ?> conf, + final BlockWorkerPieces pieces, boolean runAllChecks) { + @SuppressWarnings("unchecked") + MessageClasses<I, M> messageClasses = + pieces.getOutgoingMessageClasses(conf); + + InternalMessageStore<I, M> messageStore = + createMessageStore(conf, messageClasses); + if (runAllChecks) { + return new InternalChecksMessageStore<I, M>( + messageStore, conf, messageClasses.createMessageValueFactory(conf)); + } else { + return messageStore; + } + } + } + + /** + * InternalMessageStore that combines messages as they are received. + * + * @param <I> Vertex id value type + * @param <M> Message type + */ + static class InternalCombinerMessageStore + <I extends WritableComparable, M extends Writable> + extends InternalConcurrentMessageStore<I, M, M> { + private final MessageCombiner<? super I, M> messageCombiner; + + public InternalCombinerMessageStore(Class<I> idClass, + MessageCombiner<? super I, M> messageCombiner) { + super(idClass); + this.messageCombiner = messageCombiner; + } + + @Override + public Iterable<M> takeMessages(I id) { + M message = received.remove(id); + if (message != null) { + return Collections.singleton(message); + } else { + return null; + } + } + + @Override + public void sendMessage(I id, M message) { + M mainMessage = getReceiverFor(id); + synchronized (mainMessage) { + messageCombiner.combine(id, mainMessage, message); + } + } + + @Override + M createNewReceiver() { + return messageCombiner.createInitialMessage(); + } + } + + /** + * InternalMessageStore that keeps messages for each vertex in byte array. + * + * @param <I> Vertex id value type + * @param <M> Message type + */ + static class InternalByteMessageStore + <I extends WritableComparable, M extends Writable> + extends InternalConcurrentMessageStore<I, M, + ExtendedByteArrayDataOutput> { + private final MessageValueFactory<M> messageFactory; + + public InternalByteMessageStore( + Class<I> idClass, MessageValueFactory<M> messageFactory) { + super(idClass); + this.messageFactory = messageFactory; + } + + @Override + public Iterable<M> takeMessages(I id) { + final ExtendedByteArrayDataOutput out = received.remove(id); + if (out == null) { + return null; + } + + return new Iterable<M>() { + @Override + public Iterator<M> iterator() { + final ExtendedByteArrayDataInput in = new ExtendedByteArrayDataInput( + out.getByteArray(), 0, out.getPos()); + final M message = messageFactory.newInstance(); + return new AbstractIterator<M>() { + @Override + protected M computeNext() { + if (in.available() == 0) { + return endOfData(); + } + try { + message.readFields(in); + } catch (IOException e) { + throw new RuntimeException(e); + } + return message; + } + }; + } + }; + } + + @Override + public void sendMessage(I id, M message) { + ExtendedByteArrayDataOutput out = getReceiverFor(id); + + synchronized (out) { + try { + message.write(out); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + @Override + ExtendedByteArrayDataOutput createNewReceiver() { + return new ExtendedByteArrayDataOutput(); + } + } + + /** + * InternalMessageStore that creates byte[] for each message, and + * all receivers share the same byte[]. + * + * @param <I> Vertex id value type + * @param <M> Message type + */ + static class InternalSharedByteMessageStore + <I extends WritableComparable, M extends Writable> + extends InternalConcurrentMessageStore<I, M, List<byte[]>> { + private final MessageValueFactory<M> messageFactory; + + public InternalSharedByteMessageStore( + Class<I> idClass, MessageValueFactory<M> messageFactory) { + super(idClass); + this.messageFactory = messageFactory; + } + + @Override + public Iterable<M> takeMessages(I id) { + final List<byte[]> out = received.remove(id); + if (out == null) { + return null; + } + + return new Iterable<M>() { + @Override + public Iterator<M> iterator() { + final Iterator<byte[]> byteIter = out.iterator(); + final M message = messageFactory.newInstance(); + final UnsafeReusableByteArrayInput reusableInput = + new UnsafeReusableByteArrayInput(); + + return new Iterator<M>() { + @Override + public boolean hasNext() { + return byteIter.hasNext(); + } + + @Override + public M next() { + WritableUtils.fromByteArrayUnsafe( + byteIter.next(), message, reusableInput); + return message; + } + + @Override + public void remove() { + byteIter.remove(); + } + }; + } + }; + } + + private void storeMessage(I id, byte[] messageData) { + List<byte[]> out = getReceiverFor(id); + synchronized (out) { + out.add(messageData); + } + } + + @Override + List<byte[]> createNewReceiver() { + return new ArrayList<>(); + } + + @Override + public void sendMessage(I id, M message) { + storeMessage(id, WritableUtils.toByteArrayUnsafe(message)); + } + + @Override + public void sendMessageToMultipleEdges(Iterator<I> idIter, M message) { + byte[] messageData = WritableUtils.toByteArrayUnsafe(message); + while (idIter.hasNext()) { + storeMessage(idIter.next(), messageData); + } + } + } + + /** + * Message store that add checks for whether serialization seems to be + * working fine + */ + static class InternalChecksMessageStore + <I extends WritableComparable, M extends Writable> + implements InternalMessageStore<I, M> { + private final InternalMessageStore<I, M> messageStore; + private final ImmutableClassesGiraphConfiguration<I, ?, ?> conf; + private final MessageValueFactory<M> messageFactory; + + public InternalChecksMessageStore(InternalMessageStore<I, M> messageStore, + ImmutableClassesGiraphConfiguration<I, ?, ?> conf, + MessageValueFactory<M> messageFactory) { + this.messageStore = messageStore; + this.conf = conf; + this.messageFactory = messageFactory; + } + + // Use message copies probabilistically, to catch both not serializing some + // fields, and storing references from message object itself + // (which can be reusable). + private M maybeMessageCopy(M message) { + M messageCopy = WritableUtils.createCopy( + message, messageFactory, conf); + return ThreadLocalRandom.current().nextBoolean() ? messageCopy : message; + } + + private void checkIdCopy(I id) { + WritableUtils.createCopy(id, conf.getVertexIdFactory(), conf); + } + + @Override + public void sendMessage(I id, M message) { + checkIdCopy(id); + messageStore.sendMessage(id, maybeMessageCopy(message)); + } + + @Override + public void sendMessageToMultipleEdges( + final Iterator<I> idIter, M message) { + messageStore.sendMessageToMultipleEdges( + new Iterator<I>() { + @Override + public boolean hasNext() { + return idIter.hasNext(); + } + + @Override + public I next() { + I id = idIter.next(); + checkIdCopy(id); + return id; + } + + @Override + public void remove() { + idIter.remove(); + } + }, + maybeMessageCopy(message)); + } + + @Override + public Iterable<M> takeMessages(I id) { + checkIdCopy(id); + return messageStore.takeMessages(id); + } + + @Override + public Set<I> targetsSet() { + return messageStore.targetsSet(); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java new file mode 100644 index 0000000..bdf3233 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/LocalBlockRunner.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api.local; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.giraph.block_app.framework.api.local.InternalApi.InternalWorkerApi; +import org.apache.giraph.block_app.framework.internal.BlockMasterLogic; +import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic; +import org.apache.giraph.block_app.framework.internal.BlockWorkerLogic; +import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces; +import org.apache.giraph.block_app.framework.output.BlockOutputHandle; +import org.apache.giraph.conf.BooleanConfOption; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.IntConfOption; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.utils.InternalVertexRunner; +import org.apache.giraph.utils.TestGraph; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.writable.kryo.KryoWritableWrapper; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.util.Progressable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + +/** + * Local in-memory Block application job runner, used for testing. + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class LocalBlockRunner { + public static final IntConfOption NUM_WORKERS = new IntConfOption( + "test.LocalBlockRunner.NUM_WORKERS", 3, ""); + public static final BooleanConfOption RUN_ALL_CHECKS = new BooleanConfOption( + "test.LocalBlockRunner.RUN_ALL_CHECKS", true, ""); + // merge into RUN_ALL_CHECKS, after SERIALIZE_MASTER starts working + public static final BooleanConfOption SERIALIZE_MASTER = + new BooleanConfOption( + "test.LocalBlockRunner.SERIALIZE_MASTER", false, ""); + + private LocalBlockRunner() { } + + /** + * With a boolean flag, you can switch between LocalBlockRunner and + * InternalVertexRunner for running the unit test. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + TestGraph<I, V, E> runWithInMemoryOutput( + TestGraph<I, V, E> graph, GiraphConfiguration conf, + boolean useFullDigraphTests) throws Exception { + if (useFullDigraphTests) { + return InternalVertexRunner.runWithInMemoryOutput(conf, graph); + } else { + runWithInMemoryOutput(graph, conf); + return graph; + } + } + + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + void runWithInMemoryOutput( + TestGraph<I, V, E> graph, GiraphConfiguration conf) throws Exception { + VertexSaver<I, V, E> noOpVertexSaver = new VertexSaver<I, V, E>() { + @Override + public void saveVertex(Vertex<I, V, E> vertex) { + // No-op + } + }; + runWithVertexSaverOutput(graph, noOpVertexSaver, conf); + } + + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + void runWithVertexSaverOutput( + TestGraph<I, V, E> graph, final VertexSaver<I, V, E> vertexSaver, + GiraphConfiguration conf) throws Exception { + int numWorkers = NUM_WORKERS.get(conf); + boolean runAllChecks = RUN_ALL_CHECKS.get(conf); + boolean serializeMaster = SERIALIZE_MASTER.get(conf); + final boolean doOutputDuringComputation = conf.doOutputDuringComputation(); + + ImmutableClassesGiraphConfiguration<I, V, E> immConf = + new ImmutableClassesGiraphConfiguration(conf); + final InternalApi internalApi = + new InternalApi(graph, immConf, runAllChecks); + final InternalWorkerApi internalWorkerApi = internalApi.getWorkerApi(); + + BlockMasterLogic<Object> blockMasterLogic = new BlockMasterLogic<>(); + blockMasterLogic.initialize(immConf, internalApi); + + BlockWorkerContextLogic workerContextLogic = + internalApi.getWorkerContextLogic(); + workerContextLogic.preApplication(internalWorkerApi, + new BlockOutputHandle("", conf, new Progressable() { + @Override + public void progress() { + } + })); + + ExecutorService executor = Executors.newFixedThreadPool(numWorkers); + Random rand = new Random(); + + if (runAllChecks) { + for (Vertex<I, V, E> vertex : graph) { + V value = immConf.createVertexValue(); + WritableUtils.copyInto(vertex.getValue(), value); + vertex.setValue(value); + + vertex.setEdges((Iterable) WritableUtils.createCopy( + (Writable) vertex.getEdges(), immConf.getOutEdgesClass(), immConf)); + } + } + + final AtomicBoolean anyVertexAlive = new AtomicBoolean(true); + + for (int superstep = 0;; superstep++) { + // serialize master to test continuable computation + if (serializeMaster) { + blockMasterLogic = (BlockMasterLogic) WritableUtils.createCopy( + new KryoWritableWrapper<>(blockMasterLogic), + KryoWritableWrapper.class, + immConf).get(); + blockMasterLogic.initializeAfterRead(internalApi); + } + + if (!anyVertexAlive.get()) { + break; + } + + final BlockWorkerPieces workerPieces = + blockMasterLogic.computeNext(superstep); + if (workerPieces == null) { + if (!conf.doOutputDuringComputation()) { + Collection<Vertex<I, V, E>> vertices = internalApi.getAllVertices(); + for (Vertex<I, V, E> vertex : vertices) { + vertexSaver.saveVertex(vertex); + } + } + int left = executor.shutdownNow().size(); + Preconditions.checkState(0 == left, "Some work still left to be done?"); + break; + } else { + internalApi.afterMasterBeforeWorker(workerPieces); + List<List<Vertex<I, V, E>>> verticesPerWorker = new ArrayList<>(); + for (int i = 0; i < numWorkers; i++) { + verticesPerWorker.add(new ArrayList<Vertex<I, V, E>>()); + } + Collection<Vertex<I, V, E>> allVertices = internalApi.getAllVertices(); + for (Vertex<I, V, E> vertex : allVertices) { + verticesPerWorker.get(rand.nextInt(numWorkers)).add(vertex); + } + + workerContextLogic.preSuperstep( + internalWorkerApi, + internalWorkerApi, + KryoWritableWrapper.wrapAndCopy(workerPieces), superstep, + internalApi.takeWorkerMessages()); + + final CountDownLatch latch = new CountDownLatch(numWorkers); + final AtomicReference<Throwable> exception = new AtomicReference<>(); + anyVertexAlive.set(false); + for (final List<Vertex<I, V, E>> curVertices : verticesPerWorker) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + boolean anyCurVertexAlive = false; + BlockWorkerPieces localPieces = + KryoWritableWrapper.wrapAndCopy(workerPieces); + + BlockWorkerLogic localLogic = new BlockWorkerLogic(localPieces); + localLogic.preSuperstep(internalWorkerApi, internalWorkerApi); + + for (Vertex<I, V, E> vertex : curVertices) { + Iterable messages = internalApi.takeMessages(vertex.getId()); + if (vertex.isHalted() && !Iterables.isEmpty(messages)) { + vertex.wakeUp(); + } + if (!vertex.isHalted()) { + localLogic.compute(vertex, messages); + if (doOutputDuringComputation) { + vertexSaver.saveVertex(vertex); + } + } + + if (!vertex.isHalted()) { + anyCurVertexAlive = true; + } + } + + if (anyCurVertexAlive) { + anyVertexAlive.set(true); + } + localLogic.postSuperstep(); + // CHECKSTYLE: stop IllegalCatch + // Need to propagate all exceptions within test + } catch (Throwable t) { + // CHECKSTYLE: resume IllegalCatch + t.printStackTrace(); + exception.set(t); + } + + latch.countDown(); + } + }); + } + + latch.await(); + if (exception.get() != null) { + throw new RuntimeException("Worker failed", exception.get()); + } + + workerContextLogic.postSuperstep(); + + internalApi.afterWorkerBeforeMaster(); + } + } + + workerContextLogic.postApplication(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java new file mode 100644 index 0000000..0053644 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/VertexSaver.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api.local; + +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Interface to use for saving vertices + * + * @param <I> Vertex id + * @param <V> Vertex value + * @param <E> Edge value + */ +public interface VertexSaver<I extends WritableComparable, V extends Writable, + E extends Writable> { + void saveVertex(Vertex<I, V, E> vertex); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/package-info.java new file mode 100644 index 0000000..c9fe578 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/local/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Local (single machine) implementation of graph processing system API used by + * Blocks Framework. + * + * Allows efficient execution of Block Applications on small graphs, as well as + * comprehensive set of optional checks helping with unit tests. + */ +package org.apache.giraph.block_app.framework.api.local; http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java index e101b01..a3af507 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/SendMessageCache.java @@ -175,7 +175,8 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable> * An iterator wrapper on edges to return * target vertex ids. */ - private class TargetVertexIdIterator implements Iterator<I> { + public static class TargetVertexIdIterator<I extends WritableComparable> + implements Iterator<I> { /** An edge iterator */ private final Iterator<Edge<I, Writable>> edgesIterator; @@ -184,7 +185,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable> * * @param vertex The source vertex of the out edges */ - private TargetVertexIdIterator(Vertex<I, ?, ?> vertex) { + public TargetVertexIdIterator(Vertex<I, ?, ?> vertex) { edgesIterator = ((Vertex<I, Writable, Writable>) vertex).getEdges().iterator(); } @@ -201,7 +202,7 @@ public class SendMessageCache<I extends WritableComparable, M extends Writable> @Override public void remove() { - // No operation. + throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/79e7f1c9/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java index 0f6e73f..f17955b 100644 --- a/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java +++ b/giraph-core/src/main/java/org/apache/giraph/writable/kryo/KryoWritableWrapper.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.Writable; /** @@ -107,4 +108,16 @@ public class KryoWritableWrapper<T> implements Writable { return (T) value; } } + + /** + * Wrap object with KryoWritableWrapper, create a writable copy of it, + * and then unwrap it, allowing any object to be copied. + * + * @param object Object to copy + * @return copy of the object + * @param <T> Type of the object + */ + public static <T> T wrapAndCopy(T object) { + return WritableUtils.createCopy(new KryoWritableWrapper<>(object)).get(); + } }
