http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java new file mode 100644 index 0000000..ede6005 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterApiWrapper.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api.giraph; + +import org.apache.giraph.aggregators.Aggregator; +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockOutputApi; +import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor; +import org.apache.giraph.block_app.framework.api.Counter; +import org.apache.giraph.block_app.framework.output.BlockOutputDesc; +import org.apache.giraph.block_app.framework.output.BlockOutputHandle; +import org.apache.giraph.block_app.framework.output.BlockOutputWriter; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.internal.ReducersForPieceHandler.BroadcastHandleImpl; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.master.MasterCompute; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.hadoop.io.Writable; + +/** + * Giraph implementation of BlockMasterApi, that delegates all calls + * to MasterCompute. + */ +final class BlockMasterApiWrapper implements BlockMasterApi, + BlockOutputApi, BlockOutputHandleAccessor { + private final MasterCompute master; + private final BlockOutputHandle outputHandle; + + public BlockMasterApiWrapper(MasterCompute master, + BlockOutputHandle outputHandle) { + this.master = master; + this.outputHandle = outputHandle; + } + + @Override + public ImmutableClassesGiraphConfiguration<?, ?, ?> getConf() { + return master.getConf(); + } + + @Override + public void setStatus(String status) { + master.getContext().setStatus(status); + } + + @Override + public void progress() { + master.getContext().progress(); + } + + @Override + public Counter getCounter(String group, String name) { + final org.apache.hadoop.mapreduce.Counter counter = + master.getContext().getCounter(group, name); + return new Counter() { + @Override + public void increment(long incr) { + counter.increment(incr); + } + + @Override + public void setValue(long value) { + counter.setValue(value); + } + }; + } + + @Override + public <R extends Writable> R getReduced(String name) { + return master.getReduced(name); + } + + @Override + public void broadcast(String name, Writable value) { + master.broadcast(name, value); + } + + @Override + public <S, R extends Writable> void registerReducer( + String name, ReduceOperation<S, R> reduceOp) { + master.registerReducer(name, reduceOp); + } + + @Override + public <S, R extends Writable> void registerReducer( + String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) { + master.registerReducer(name, reduceOp, globalInitialValue); + } + + @Override + public <A extends Writable> A getAggregatedValue(String name) { + return master.getAggregatedValue(name); + } + + @Override + public <A extends Writable> + boolean registerAggregator( + String name, Class<? extends Aggregator<A>> aggregatorClass + ) throws InstantiationException, IllegalAccessException { + return master.registerAggregator(name, aggregatorClass); + } + + @Override + public <A extends Writable> + boolean registerPersistentAggregator( + String name, Class<? extends Aggregator<A>> aggregatorClass + ) throws InstantiationException, + IllegalAccessException { + return master.registerPersistentAggregator(name, aggregatorClass); + } + + @Override + public <A extends Writable> void setAggregatedValue(String name, A value) { + master.setAggregatedValue(name, value); + } + + @Override + public <T extends Writable> BroadcastHandle<T> broadcast(T object) { + BroadcastHandleImpl<T> handle = new BroadcastHandleImpl<>(); + master.broadcast(handle.getName(), object); + return handle; + } + + @Override + @Deprecated + public long getTotalNumEdges() { + return master.getTotalNumEdges(); + } + + @Override + @Deprecated + public long getTotalNumVertices() { + return master.getTotalNumVertices(); + } + + @Override + public void logToCommandLine(String line) { + master.logToCommandLine(line); + } + + @Override + public <OW extends BlockOutputWriter, + OD extends BlockOutputDesc<OW>> OD getOutputDesc(String confOption) { + return outputHandle.<OW, OD>getOutputDesc(confOption); + } + + @Override + public <OW extends BlockOutputWriter> OW getWriter(String confOption) { + return outputHandle.getWriter(confOption); + } + + @Override + public BlockOutputHandle getBlockOutputHandle() { + return outputHandle; + } +}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterCompute.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterCompute.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterCompute.java new file mode 100644 index 0000000..69cf9f8 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockMasterCompute.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api.giraph; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.giraph.block_app.framework.internal.BlockMasterLogic; +import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces; +import org.apache.giraph.block_app.framework.output.BlockOutputHandle; +import org.apache.giraph.master.MasterCompute; +import org.apache.giraph.writable.kryo.KryoWritableWrapper; + +/** + * MasterCompute class which executes block computation. + * + * @param <S> Execution stage type + */ +public final class BlockMasterCompute<S> extends MasterCompute { + private BlockMasterLogic<S> blockMasterLogic = new BlockMasterLogic<>(); + + @Override + public void initialize() throws InstantiationException, + IllegalAccessException { + blockMasterLogic.initialize(getConf(), new BlockMasterApiWrapper(this, + new BlockOutputHandle(getContext().getJobID().toString(), + getConf(), getContext()))); + } + + @Override + public void compute() { + BlockWorkerPieces<S> workerPieces = + blockMasterLogic.computeNext(getSuperstep()); + if (workerPieces == null) { + haltComputation(); + } else { + BlockWorkerPieces.setNextWorkerPieces(this, workerPieces); + } + } + + @Override + public void write(DataOutput out) throws IOException { + new KryoWritableWrapper<>(blockMasterLogic).write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + KryoWritableWrapper<BlockMasterLogic<S>> object = + new KryoWritableWrapper<>(); + object.readFields(in); + blockMasterLogic = object.get(); + blockMasterLogic.initializeAfterRead(new BlockMasterApiWrapper(this, + new BlockOutputHandle(getContext().getJobID().toString(), + getConf(), getContext()))); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java new file mode 100644 index 0000000..6e839f9 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerApiWrapper.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api.giraph; + +import java.io.IOException; +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.api.BlockOutputApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerValueAccessor; +import org.apache.giraph.block_app.framework.output.BlockOutputDesc; +import org.apache.giraph.block_app.framework.output.BlockOutputWriter; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.edge.Edge; +import org.apache.giraph.edge.OutEdges; +import org.apache.giraph.graph.Computation; +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.types.NoMessage; +import org.apache.giraph.worker.WorkerGlobalCommUsage; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Giraph implementation of BlockWorkerReceiveApi and BlockWorkerSendAPI, + * passing all calls to Computation. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + * @param <M> Message type + */ +@SuppressWarnings("rawtypes") +final class BlockWorkerApiWrapper<I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> + implements BlockWorkerReceiveApi<I>, BlockWorkerSendApi<I, V, E, M>, + BlockWorkerValueAccessor, WorkerGlobalCommUsage, BlockOutputApi { + private final Computation<I, V, E, NoMessage, M> worker; + + public BlockWorkerApiWrapper(Computation<I, V, E, NoMessage, M> worker) { + this.worker = worker; + } + + @Override + public ImmutableClassesGiraphConfiguration<I, V, E> getConf() { + return worker.getConf(); + } + + @Override + public <A extends Writable> void aggregate(String name, A value) { + worker.aggregate(name, value); + } + + @Override + public <A extends Writable> A getAggregatedValue(String name) { + return worker.getAggregatedValue(name); + } + + @Override + public <B extends Writable> B getBroadcast(String name) { + return worker.getBroadcast(name); + } + + @Override + public void reduce(String name, Object value) { + worker.reduce(name, value); + } + + @Override + public void reduceMerge(String name, Writable value) { + worker.reduceMerge(name, value); + } + + @Override + public void sendMessage(I id, M message) { + worker.sendMessage(id, message); + } + + @Override + public void sendMessageToAllEdges(Vertex<I, V, E> vertex, M message) { + worker.sendMessageToAllEdges(vertex, message); + } + + @Override + public void sendMessageToMultipleEdges( + Iterator<I> vertexIdIterator, M message) { + worker.sendMessageToMultipleEdges(vertexIdIterator, message); + } + + @Override + public void addVertexRequest(I id, V value) { + try { + worker.addVertexRequest(id, value); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void addVertexRequest(I id, V value, OutEdges<I, E> edges) { + try { + worker.addVertexRequest(id, value, edges); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void removeVertexRequest(I vertexId) { + try { + worker.removeVertexRequest(vertexId); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void addEdgeRequest(I sourceVertexId, Edge<I, E> edge) { + try { + worker.addEdgeRequest(sourceVertexId, edge); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void removeEdgesRequest(I sourceVertexId, I targetVertexId) { + try { + worker.removeEdgesRequest(sourceVertexId, targetVertexId); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private BlockWorkerContext getBlockWorkerContext() { + return (BlockWorkerContext) worker.getWorkerContext(); + } + + @Override + public Object getWorkerValue() { + return getBlockWorkerContext().getWorkerValue(); + } + + @Override + public long getTotalNumEdges() { + return worker.getTotalNumEdges(); + } + + @Override + public long getTotalNumVertices() { + return worker.getTotalNumVertices(); + } + + @Override + public <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>> + OD getOutputDesc(String confOption) { + return getBlockWorkerContext().getOutputHandle().<OW, OD>getOutputDesc( + confOption); + } + + @Override + public <OW extends BlockOutputWriter> OW getWriter(String confOption) { + return getBlockWorkerContext().getOutputHandle().getWriter(confOption); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java new file mode 100644 index 0000000..1a4f8d8 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContext.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api.giraph; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +import org.apache.giraph.block_app.framework.internal.BlockWorkerContextLogic; +import org.apache.giraph.block_app.framework.internal.BlockWorkerPieces; +import org.apache.giraph.block_app.framework.output.BlockOutputHandle; +import org.apache.giraph.worker.WorkerContext; +import org.apache.giraph.writable.kryo.HadoopKryo; +import org.apache.giraph.writable.kryo.markers.KryoIgnoreWritable; +import org.apache.hadoop.io.Writable; +import org.apache.log4j.Logger; + +/** + * WorkerContext that executes receiver and sender blocks passed + * into BlockWorkerPieces. + */ +public final class BlockWorkerContext extends WorkerContext + implements KryoIgnoreWritable { + public static final Logger LOG = Logger.getLogger(BlockWorkerContext.class); + + private BlockWorkerContextLogic workerLogic; + + @Override + public void preApplication() + throws InstantiationException, IllegalAccessException { + workerLogic = new BlockWorkerContextLogic(); + workerLogic.preApplication(new BlockWorkerContextApiWrapper<>(this), + new BlockOutputHandle(getContext().getJobID().toString(), + getConf(), getContext())); + } + + @Override + public void preSuperstep() { + List<Writable> messages = getAndClearMessagesFromOtherWorkers(); + BlockWorkerContextApiWrapper<Writable> workerApi = + new BlockWorkerContextApiWrapper<>(this); + BlockWorkerPieces<Object> workerPieces = + BlockWorkerPieces.getNextWorkerPieces(this); + + LOG.info("PassedComputation in " + getSuperstep() + + " superstep executing " + workerPieces); + + workerLogic.preSuperstep( + workerApi, workerApi, workerPieces, getSuperstep(), messages); + } + + @Override + public void postSuperstep() { + workerLogic.postSuperstep(); + } + + @Override + public void postApplication() { + workerLogic.postApplication(); + } + + public Object getWorkerValue() { + return workerLogic.getWorkerValue(); + } + + public BlockOutputHandle getOutputHandle() { + return workerLogic.getOutputHandle(); + } + + // Cannot extend KryoWritable directly, since WorkerContext is + // abstract class, not interface... Additionally conf in parent + // class cannot be made transient. + // So just add serialization of two individual fields. + // (and adding KryoIgnoreWritable to avoid wrapping it twice) + + @Override + public void write(DataOutput out) throws IOException { + HadoopKryo.writeClassAndObject(out, workerLogic); + } + + @Override + public void readFields(DataInput in) throws IOException { + workerLogic = HadoopKryo.readClassAndObject(in); + workerLogic.getOutputHandle().initialize(getConf(), getContext()); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java new file mode 100644 index 0000000..c52b6a5 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/BlockWorkerContextApiWrapper.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.api.giraph; + +import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.worker.WorkerContext; +import org.apache.hadoop.io.Writable; + +/** + * Giraph implementation of BlockWorkerContextReceiveApi and + * BlockWorkerContextSendApi, passing all calls to WorkerContext. + * + * @param <WM> Worker message type + */ +final class BlockWorkerContextApiWrapper<WM extends Writable> + implements BlockWorkerContextReceiveApi, BlockWorkerContextSendApi<WM> { + private final WorkerContext workerContext; + + public BlockWorkerContextApiWrapper(WorkerContext workerContext) { + this.workerContext = workerContext; + } + + @Override + public ImmutableClassesGiraphConfiguration<?, ?, ?> getConf() { + return workerContext.getConf(); + } + + @Override + public int getWorkerCount() { + return workerContext.getWorkerCount(); + } + + @Override + public int getMyWorkerIndex() { + return workerContext.getMyWorkerIndex(); + } + + @Override + public <A extends Writable> A getAggregatedValue(String name) { + return workerContext.getAggregatedValue(name); + } + + @Override + public <A extends Writable> void aggregate(String name, A value) { + workerContext.aggregate(name, value); + } + + @Override + public void sendMessageToWorker(WM message, int workerIndex) { + workerContext.sendMessageToWorker(message, workerIndex); + } + + @Override + public <B extends Writable> B getBroadcast(String name) { + return workerContext.getBroadcast(name); + } + + @Override + public long getTotalNumEdges() { + return workerContext.getTotalNumEdges(); + } + + @Override + public long getTotalNumVertices() { + return workerContext.getTotalNumVertices(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/package-info.java new file mode 100644 index 0000000..e20fb8e --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/giraph/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Giraph implementation of graph processing system API used by + * Blocks Framework. + */ +package org.apache.giraph.block_app.framework.api.giraph; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/package-info.java new file mode 100644 index 0000000..c10e5d2 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/api/package-info.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Interfaces representing full API to the underlying graph processing system. + * + * Framework implementation is fully contained within package + * org.apache.giraph.block_app.framework, given implementation of interfaces + * defined here. + * + * He have two such implementations: + * - one relying on Giraph, distributed graph processing system, + * connecting all methods to it's internals + * - one having a fully contained local implementation, executing applications + * on a single machine. Avoiding overheads of Giraph being distributed, + * it allows very efficient evaluation on small graphs, especially useful for + * fast unit tests. + * + * You could potentially use a different graph processing system, to execute + * any Block Application, by implementing these interfaces. + */ +package org.apache.giraph.block_app.framework.api; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/Block.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/Block.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/Block.java new file mode 100644 index 0000000..6d5287c --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/Block.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.block; + +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.function.Consumer; + +/** + * Composable unit of execution. Used to combine other Blocks into + * bigger units. Each Piece represents a Block itself. + * + * Execution is represented as an iterator across Pieces. + * + * The whole application run is represented by a single block at the end. + */ +@SuppressWarnings("rawtypes") +public interface Block extends Iterable<AbstractPiece> { + /** + * Create iterator representing all pieces needed to be executed + * in this block. + * + * After Iterator.next call returns, master compute of returned Piece is + * guaranteed to be called before calling hasNext/next on the iterator. + * (allows for iterators logic to depend on the execution dynamically, + * and not be only static) + */ + @Override + Iterator<AbstractPiece> iterator(); + + /** + * Calls consumer for each Piece: + * - in no particular order + * - potentially calling multiple times on same Piece + * - even if Piece might never be returned in the iterator + * - it will be called at least once for every piece that is + * going to be returned by iterator + * + * Can be used for static analysis/introspection of the block, + * without actually executing them. + */ + void forAllPossiblePieces(Consumer<AbstractPiece> consumer); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/EmptyBlock.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/EmptyBlock.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/EmptyBlock.java new file mode 100644 index 0000000..1a57402 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/EmptyBlock.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.block; + +import java.util.Collections; +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.function.Consumer; + +/** + * Block without any pieces + */ +@SuppressWarnings("rawtypes") +public final class EmptyBlock implements Block { + @Override + public Iterator<AbstractPiece> iterator() { + return Collections.emptyIterator(); + } + + @Override + public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) { + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/FilteringBlock.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/FilteringBlock.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/FilteringBlock.java new file mode 100644 index 0000000..5631417 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/FilteringBlock.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.block; + +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.block_app.framework.piece.delegate.FilteringPiece; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.function.vertex.SupplierFromVertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.base.Function; +import com.google.common.collect.Iterators; + +/** + * Block which filters out calls to vertexSend/vertexReceive functions + * of all pieces in a given block. + * Filtering happens based on toCallSend and toCallReceive suppliers + * that are passed in, as every piece is just wrapped with FilteringPiece. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +public final class FilteringBlock<I extends WritableComparable, + V extends Writable, E extends Writable> + implements Block { + private final SupplierFromVertex<I, V, E, Boolean> toCallSend; + private final SupplierFromVertex<I, V, E, Boolean> toCallReceive; + private final Block block; + + /** + * Creates filtering block which uses passed {@code toCallSend} to filter + * calls to {@code vertexSend}, and passed {@code toCallReceive} to filter + * calls to {@code vertexReceive}, on all pieces within passed {@code block}. + */ + public FilteringBlock( + SupplierFromVertex<I, V, E, Boolean> toCallSend, + SupplierFromVertex<I, V, E, Boolean> toCallReceive, + Block block) { + this.toCallSend = toCallSend; + this.toCallReceive = toCallReceive; + this.block = block; + } + + /** + * Creates filtering block, where both vertexSend and vertexReceive is + * filtered based on same supplier. + */ + public FilteringBlock( + SupplierFromVertex<I, V, E, Boolean> toCallSendAndReceive, Block block) { + this(toCallSendAndReceive, toCallSendAndReceive, block); + } + + /** + * Creates filtering block, that filters only vertexReceive function, + * and always calls vertexSend function. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + Block createReceiveFiltering( + SupplierFromVertex<I, V, E, Boolean> toCallReceive, + Block innerBlock) { + return new FilteringBlock<>(null, toCallReceive, innerBlock); + } + + /** + * Creates filtering block, that filters only vertexSend function, + * and always calls vertexReceive function. + */ + public static + <I extends WritableComparable, V extends Writable, E extends Writable> + Block createSendFiltering( + SupplierFromVertex<I, V, E, Boolean> toCallSend, + Block innerBlock) { + return new FilteringBlock<>(toCallSend, null, innerBlock); + } + + @Override + public Iterator<AbstractPiece> iterator() { + return Iterators.transform( + block.iterator(), + new Function<AbstractPiece, AbstractPiece>() { + @Override + public AbstractPiece apply(AbstractPiece input) { + return new FilteringPiece<>(toCallSend, toCallReceive, input); + } + }); + } + + @Override + public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) { + block.forAllPossiblePieces(consumer); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/IfBlock.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/IfBlock.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/IfBlock.java new file mode 100644 index 0000000..e73392d --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/IfBlock.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.block; + +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.function.Supplier; + +/** + * Block that executes one of two branches based on a condition + */ +@SuppressWarnings("rawtypes") +public final class IfBlock implements Block { + private final Block thenBlock; + private final Block elseBlock; + private final Supplier<Boolean> condition; + + public IfBlock( + Supplier<Boolean> condition, Block thenBlock, Block elseBlock) { + this.condition = condition; + this.thenBlock = thenBlock; + this.elseBlock = elseBlock; + } + + public IfBlock(Supplier<Boolean> condition, Block thenBlock) { + this.condition = condition; + this.thenBlock = thenBlock; + this.elseBlock = new EmptyBlock(); + } + + @Override + public Iterator<AbstractPiece> iterator() { + if (Boolean.TRUE.equals(condition.get())) { + return thenBlock.iterator(); + } else { + return elseBlock.iterator(); + } + } + + @Override + public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) { + thenBlock.forAllPossiblePieces(consumer); + elseBlock.forAllPossiblePieces(consumer); + } + + @Override + public String toString() { + if (elseBlock instanceof EmptyBlock) { + return "IfBlock(" + thenBlock + ")"; + } + return "IfBlock(" + thenBlock + " , " + elseBlock + ")"; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatBlock.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatBlock.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatBlock.java new file mode 100644 index 0000000..9f4f4a0 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatBlock.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.block; + +import java.util.Collections; +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.function.primitive.IntSupplier; + +import com.google.common.collect.Iterables; + +/** + * Block that repeats another block given number of times. + */ +@SuppressWarnings("rawtypes") +public final class RepeatBlock implements Block { + private final Block block; + private final IntSupplier repeatTimes; + + public RepeatBlock(final int repeatTimes, Block block) { + this.block = block; + this.repeatTimes = new IntSupplier() { + @Override + public int get() { + return repeatTimes; + } + }; + } + + /** + * Creates a repeat block, that before starting execution takes number of + * iterations from the given supplier. + * + * This allows number of iterations to be dynamic, and depend on + * execution that happens before. + * Note - it doesn't allow for number of repetitions to change during the + * loop itself - as it is supplier is called only when this block gets + * its turn. + */ + public RepeatBlock(IntSupplier repeatTimes, Block block) { + this.block = block; + this.repeatTimes = repeatTimes; + } + + /** + * Create a repeat block that executes unlimited number of times. + * + * Should rarely be used, as it will cause application never to finish, + * unless other unconventional ways of termination are used. + */ + public static Block unlimited(Block block) { + return new RepeatBlock(Integer.MAX_VALUE, block); + } + + @Override + public Iterator<AbstractPiece> iterator() { + return Iterables.concat( + Collections.nCopies(repeatTimes.get(), block)).iterator(); + } + + @Override + public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) { + block.forAllPossiblePieces(consumer); + } + + @Override + public String toString() { + return "RepeatBlock(" + repeatTimes + " * " + block + ")"; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java new file mode 100644 index 0000000..13e8833 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/RepeatUntilBlock.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.block; + +import java.util.Collections; +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.function.Consumer; +import org.apache.giraph.function.Supplier; + +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Iterables; + +/** + * Block that repeats another block until toQuit supplier returns true, + * but at most given number of times. + * + * If toQuit returns true on first run, block is not going + * to be executed at all. + */ +@SuppressWarnings("rawtypes") +public final class RepeatUntilBlock implements Block { + private final Block block; + private final int repeatTimes; + private final Supplier<Boolean> toQuit; + + public RepeatUntilBlock( + int repeatTimes, Block block, Supplier<Boolean> toQuit) { + this.block = block; + this.repeatTimes = repeatTimes; + this.toQuit = toQuit; + } + + /** + * Repeat unlimited number of times, until toQuit supplier returns true. + */ + public static Block unlimited(Block block, Supplier<Boolean> toQuit) { + return new RepeatUntilBlock(Integer.MAX_VALUE, block, toQuit); + } + + @Override + public Iterator<AbstractPiece> iterator() { + // nCopies uses constant memory, creating a looped list with single element + final Iterator<AbstractPiece> repeatIterator = + Iterables.concat(Collections.nCopies(repeatTimes, block)).iterator(); + return new AbstractIterator<AbstractPiece>() { + @Override + protected AbstractPiece computeNext() { + if (Boolean.TRUE.equals(toQuit.get()) || !repeatIterator.hasNext()) { + return endOfData(); + } + + return repeatIterator.next(); + } + }; + } + + @Override + public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) { + block.forAllPossiblePieces(consumer); + } + + @Override + public String toString() { + return "RepeatUntilBlock(" + repeatTimes + " * " + block + ")"; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/SequenceBlock.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/SequenceBlock.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/SequenceBlock.java new file mode 100644 index 0000000..d768f0b --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/SequenceBlock.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.block; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.function.Consumer; + +import com.google.common.collect.Iterables; + +/** + * Block that executes provided blocks sequentially. + */ +@SuppressWarnings("rawtypes") +public final class SequenceBlock implements Block { + private final Block[] blocks; + + public SequenceBlock(Block... blocks) { + this.blocks = blocks.clone(); + } + + public SequenceBlock(List<? extends Block> blocks) { + this.blocks = blocks.toArray(new Block[blocks.size()]); + } + + @Override + public Iterator<AbstractPiece> iterator() { + return Iterables.concat(Arrays.asList(blocks)).iterator(); + } + + @Override + public void forAllPossiblePieces(Consumer<AbstractPiece> consumer) { + for (Block block : blocks) { + block.forAllPossiblePieces(consumer); + } + } + + @Override + public String toString() { + return "SequenceBlock" + Arrays.toString(blocks); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/package-info.java new file mode 100644 index 0000000..64adc35 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/block/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Block interface as a composable unit of execution, and its common + * implementations. + */ +package org.apache.giraph.block_app.framework.block; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockCounters.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockCounters.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockCounters.java new file mode 100644 index 0000000..6a2fb39 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockCounters.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.internal; + +import java.lang.reflect.Field; + +import org.apache.giraph.block_app.framework.api.StatusReporter; + +/** Utility class for Blocks Framework related counters */ +public class BlockCounters { + public static final String GROUP = "Blocks Framework"; + + private BlockCounters() { } + + /** + * Takes all fields from stage object, and puts them into counters, + * if possible. + * Only fields that are convertible to long via widening are set + * (i.e. long/int/short/byte) + */ + public static void setStageCounters( + String prefix, Object stage, StatusReporter reporter) { + if (stage != null && reporter != null) { + Class<?> clazz = stage.getClass(); + + while (clazz != null) { + Field[] fields = clazz.getDeclaredFields(); + + Field.setAccessible(fields, true); + for (Field field : fields) { + try { + long value = field.getLong(stage); + reporter.getCounter( + GROUP, prefix + field.getName()).setValue(value); + + // CHECKSTYLE: stop EmptyBlock - ignore any exceptions + } catch (IllegalArgumentException | IllegalAccessException e) { + } + // CHECKSTYLE: resume EmptyBlock + } + clazz = clazz.getSuperclass(); + } + } + } + + public static void setMasterTimeCounter( + PairedPieceAndStage<?> masterPiece, long superstep, + long millis, StatusReporter reporter) { + reporter.getCounter( + GROUP + " Master Timers", + String.format( + "In %6.1f %s (s)", superstep - 0.5, masterPiece.getPiece()) + ).setValue(millis / 1000); + } + + public static void setWorkerTimeCounter( + BlockWorkerPieces<?> workerPieces, long superstep, + long millis, StatusReporter reporter) { + reporter.getCounter( + GROUP + " Worker Timers", + String.format("In %6d %s (s)", superstep, workerPieces.toStringShort()) + ).setValue(millis / 1000); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java new file mode 100644 index 0000000..3b87372 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockMasterLogic.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.internal; + +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.BlockFactory; +import org.apache.giraph.block_app.framework.BlockUtils; +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockOutputHandleAccessor; +import org.apache.giraph.block_app.framework.block.Block; +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.function.Consumer; +import org.apache.log4j.Logger; +import org.python.google.common.base.Preconditions; + +/** + * Block execution logic on master, iterating over Pieces of the + * application Block, executing master logic, and providing what needs to be + * executed on the workers. + * + * @param <S> Execution stage type + */ +@SuppressWarnings("rawtypes") +public class BlockMasterLogic<S> { + private static final Logger LOG = Logger.getLogger(BlockMasterLogic.class); + + private Iterator<AbstractPiece> pieceIterator; + private PairedPieceAndStage<S> previousPiece; + private transient BlockMasterApi masterApi; + private long lastTimestamp = -1; + private BlockWorkerPieces previousWorkerPieces; + private boolean computationDone; + + public void initialize( + GiraphConfiguration conf, final BlockMasterApi masterApi) + throws InstantiationException, IllegalAccessException { + this.masterApi = masterApi; + this.computationDone = false; + + BlockFactory<S> factory = BlockUtils.createBlockFactory(conf); + Block executionBlock = factory.createBlock(conf); + LOG.info("Executing application - " + executionBlock); + + // We register all possible aggregators at the beginning + executionBlock.forAllPossiblePieces(new Consumer<AbstractPiece>() { + private final HashSet<AbstractPiece> registeredPieces = new HashSet<>(); + @SuppressWarnings("deprecation") + @Override + public void apply(AbstractPiece piece) { + // no need to regiser the same piece twice. + if (registeredPieces.add(piece)) { + try { + piece.registerAggregators(masterApi); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } + }); + + pieceIterator = executionBlock.iterator(); + // Invariant is that ReceiveWorkerPiece of previousPiece has already been + // executed and that previousPiece.nextExecutionStage() should be used for + // iterating. So passing piece as null, and initial state as current state, + // so that nothing get's executed in first half, and calculateNextState + // returns initial state. + previousPiece = new PairedPieceAndStage<>( + null, factory.createExecutionStage(conf)); + } + + /** + * Initialize object after deserializing it. + * BlockMasterApi is not serializable, so it is transient, and set via this + * method afterwards. + */ + public void initializeAfterRead(BlockMasterApi masterApi) { + this.masterApi = masterApi; + } + + /** + * Executes operations on master (master compute and registering reducers), + * and calculates next pieces to be exectued on workers. + * + * @param superstep Current superstep + * @return Next BlockWorkerPieces to be executed on workers, or null + * if computation should be halted. + */ + public BlockWorkerPieces<S> computeNext(long superstep) { + long beforeMaster = System.currentTimeMillis(); + if (lastTimestamp != -1) { + BlockCounters.setWorkerTimeCounter( + previousWorkerPieces, superstep - 1, + beforeMaster - lastTimestamp, masterApi); + } + + if (previousPiece == null) { + postApplication(); + return null; + } else { + LOG.info( + "Master executing " + previousPiece + ", in superstep " + superstep); + previousPiece.masterCompute(masterApi); + ((BlockOutputHandleAccessor) masterApi).getBlockOutputHandle(). + returnAllWriters(); + long afterMaster = System.currentTimeMillis(); + + if (previousPiece.getPiece() != null) { + BlockCounters.setMasterTimeCounter( + previousPiece, superstep, afterMaster - beforeMaster, masterApi); + } + + PairedPieceAndStage<S> nextPiece; + if (pieceIterator.hasNext()) { + nextPiece = new PairedPieceAndStage<S>( + pieceIterator.next(), previousPiece.nextExecutionStage()); + nextPiece.registerReducers(masterApi); + } else { + nextPiece = null; + } + BlockCounters.setStageCounters( + "Master finished stage: ", previousPiece.getExecutionStage(), + masterApi); + LOG.info( + "Master passing next " + nextPiece + ", in superstep " + superstep); + + // if there is nothing more to compute, no need for additional superstep + // this can only happen if application uses no pieces. + BlockWorkerPieces<S> result; + if (previousPiece.getPiece() == null && nextPiece == null) { + postApplication(); + result = null; + } else { + result = new BlockWorkerPieces<>(previousPiece, nextPiece); + LOG.info("Master in " + superstep + " superstep passing " + + result + " to be executed"); + } + + previousPiece = nextPiece; + lastTimestamp = afterMaster; + previousWorkerPieces = result; + return result; + } + } + + /** + * Clean up any master state, after application has finished. + */ + private void postApplication() { + ((BlockOutputHandleAccessor) masterApi).getBlockOutputHandle(). + closeAllWriters(); + Preconditions.checkState(!computationDone); + computationDone = true; + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java new file mode 100644 index 0000000..8b8e174 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerContextLogic.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.internal; + +import java.util.List; + +import org.apache.giraph.block_app.framework.BlockUtils; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi; +import org.apache.giraph.block_app.framework.output.BlockOutputHandle; +import org.apache.hadoop.io.Writable; +import org.apache.log4j.Logger; + +/** + * Block execution logic on WorkerContext. + */ +@SuppressWarnings({ "rawtypes" }) +public class BlockWorkerContextLogic { + public static final Logger LOG = + Logger.getLogger(BlockWorkerContextLogic.class); + + private Object workerValue; + private BlockWorkerPieces workerPieces; + private BlockOutputHandle outputHandle; + + private transient BlockWorkerContextSendApi sendApi; + + public BlockWorkerContextLogic() { + } + + public void preApplication(BlockWorkerContextApi api, + BlockOutputHandle outputHandle) { + workerValue = + BlockUtils.BLOCK_WORKER_CONTEXT_VALUE_CLASS.newInstance(api.getConf()); + this.outputHandle = outputHandle; + } + + public Object getWorkerValue() { + return workerValue; + } + + public BlockOutputHandle getOutputHandle() { + return outputHandle; + } + + @SuppressWarnings("unchecked") + public void preSuperstep( + BlockWorkerContextReceiveApi receiveApi, + BlockWorkerContextSendApi sendApi, + BlockWorkerPieces workerPieces, long superstep, + List<Writable> messages) { + LOG.info("Worker executing " + workerPieces + " in " + superstep + + " superstep"); + this.sendApi = sendApi; + this.workerPieces = workerPieces; + if (workerPieces.getReceiver() != null) { + workerPieces.getReceiver().workerContextReceive( + receiveApi, workerValue, messages); + } + } + + public void postSuperstep() { + if (workerPieces.getSender() != null) { + workerPieces.getSender().workerContextSend(sendApi, workerValue); + } + workerPieces = null; + sendApi = null; + outputHandle.returnAllWriters(); + } + + public void postApplication() { + outputHandle.closeAllWriters(); + // TODO add support through conf for postApplication, if needed. + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java new file mode 100644 index 0000000..844160c --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerLogic.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.internal; + +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.piece.AbstractPiece.InnerVertexSender; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexPostprocessor; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.giraph.graph.Vertex; + +/** + * Block execution logic on workers. + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class BlockWorkerLogic { + private final BlockWorkerPieces pieces; + + private transient VertexReceiver receiveFunctions; + private transient InnerVertexSender sendFunctions; + + public BlockWorkerLogic(BlockWorkerPieces pieces) { + this.pieces = pieces; + } + + public void preSuperstep( + BlockWorkerReceiveApi receiveApi, BlockWorkerSendApi sendApi) { + if (pieces.getReceiver() != null) { + receiveFunctions = pieces.getReceiver().getVertexReceiver(receiveApi); + } + if (pieces.getSender() != null) { + sendFunctions = pieces.getSender().getVertexSender(sendApi); + } + } + + public void compute(Vertex vertex, Iterable messages) { + if (receiveFunctions != null) { + receiveFunctions.vertexReceive(vertex, messages); + } + if (sendFunctions != null) { + sendFunctions.vertexSend(vertex); + } + } + + public void postSuperstep() { + if (receiveFunctions instanceof VertexPostprocessor) { + ((VertexPostprocessor) receiveFunctions).postprocess(); + } + if (sendFunctions != null) { + sendFunctions.postprocess(); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerPieces.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerPieces.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerPieces.java new file mode 100644 index 0000000..3b38cfa --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/BlockWorkerPieces.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.internal; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Objects; + +import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; +import org.apache.giraph.conf.DefaultMessageClasses; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.MessageClasses; +import org.apache.giraph.factories.DefaultMessageValueFactory; +import org.apache.giraph.master.MasterCompute; +import org.apache.giraph.types.NoMessage; +import org.apache.giraph.utils.UnsafeReusableByteArrayInput; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.worker.WorkerGlobalCommUsage; +import org.apache.giraph.writable.kryo.KryoWritableWrapper; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Writable; +import org.apache.log4j.Logger; + +/** + * Pair of pieces to be executed on workers in a superstep + * + * @param <S> Execution stage type + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class BlockWorkerPieces<S> { + private static final Logger LOG = Logger.getLogger(BlockWorkerPieces.class); + + /** Aggregator holding next worker computation */ + private static final + String NEXT_WORKER_PIECES = "giraph.blocks.next_worker_pieces"; + + private final PairedPieceAndStage<S> receiver; + private final PairedPieceAndStage<S> sender; + + public BlockWorkerPieces( + PairedPieceAndStage<S> receiver, PairedPieceAndStage<S> sender) { + this.receiver = receiver; + this.sender = sender; + } + + public PairedPieceAndStage<S> getReceiver() { + return receiver; + } + + public PairedPieceAndStage<S> getSender() { + return sender; + } + + public MessageClasses getOutgoingMessageClasses( + ImmutableClassesGiraphConfiguration conf) { + MessageClasses messageClasses; + if (sender == null || sender.getPiece() == null) { + messageClasses = new DefaultMessageClasses( + NoMessage.class, + DefaultMessageValueFactory.class, + null, + MessageEncodeAndStoreType.BYTEARRAY_PER_PARTITION); + } else { + messageClasses = sender.getPiece().getMessageClasses(conf); + } + + messageClasses.verifyConsistent(conf); + return messageClasses; + } + + @Override + public String toString() { + return "[receiver=" + receiver + ",sender=" + sender + "]"; + } + + public String toStringShort() { + String receiverString = + Objects.toString(receiver != null ? receiver.getPiece() : null); + String senderString = + Objects.toString(sender != null ? sender.getPiece() : null); + if (receiverString.equals(senderString)) { + return "[receiver&sender=" + receiverString + "]"; + } else { + return "[receiver=" + receiverString + ",sender=" + senderString + "]"; + } + } + + /** + * Sets which WorkerComputation is going to be executed in the next superstep. + */ + public static <S> void setNextWorkerPieces( + MasterCompute master, BlockWorkerPieces<S> nextWorkerPieces) { + Writable toBroadcast = new KryoWritableWrapper<>(nextWorkerPieces); + byte[] data = WritableUtils.toByteArrayUnsafe(toBroadcast); + + // TODO: extract splitting logic into common utility + int overhead = 4096; + int singleSize = Math.max( + overhead, + GiraphConstants.MAX_MSG_REQUEST_SIZE.get(master.getConf()) - overhead); + + ArrayList<byte[]> splittedData = new ArrayList<>(); + if (data.length < singleSize) { + splittedData.add(data); + } else { + for (int start = 0; start < data.length; start += singleSize) { + splittedData.add(Arrays.copyOfRange( + data, start, Math.min(data.length, start + singleSize))); + } + } + + LOG.info("Next worker piece - total serialized size: " + data.length + + ", split into " + splittedData.size()); + master.getContext().getCounter( + "PassedWorker Stats", "total serialized size") + .increment(data.length); + master.getContext().getCounter( + "PassedWorker Stats", "split parts") + .increment(splittedData.size()); + + master.broadcast(NEXT_WORKER_PIECES, new IntWritable(splittedData.size())); + + for (int i = 0; i < splittedData.size(); i++) { + master.broadcast(NEXT_WORKER_PIECES + "_part_" + i, + KryoWritableWrapper.wrapIfNeeded(splittedData.get(i))); + } + + master.setOutgoingMessageClasses( + nextWorkerPieces.getOutgoingMessageClasses(master.getConf())); + } + + public static <S> BlockWorkerPieces<S> getNextWorkerPieces( + WorkerGlobalCommUsage worker) { + int splits = worker.<IntWritable>getBroadcast(NEXT_WORKER_PIECES).get(); + + int totalLength = 0; + ArrayList<byte[]> splittedData = new ArrayList<>(); + for (int i = 0; i < splits; i++) { + byte[] cur = KryoWritableWrapper.<byte[]>unwrapIfNeeded( + worker.getBroadcast(NEXT_WORKER_PIECES + "_part_" + i)); + splittedData.add(cur); + totalLength += cur.length; + } + + byte[] merged; + if (splits == 1) { + merged = splittedData.get(0); + } else { + merged = new byte[totalLength]; + int index = 0; + for (int i = 0; i < splits; i++) { + System.arraycopy( + splittedData.get(i), 0, merged, index, splittedData.get(i).length); + index += splittedData.get(i).length; + } + } + + KryoWritableWrapper<BlockWorkerPieces<S>> wrapper = + new KryoWritableWrapper<>(); + WritableUtils.fromByteArrayUnsafe( + merged, wrapper, new UnsafeReusableByteArrayInput()); + return wrapper.get(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/PairedPieceAndStage.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/PairedPieceAndStage.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/PairedPieceAndStage.java new file mode 100644 index 0000000..90fe83e --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/PairedPieceAndStage.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.internal; + +import java.util.List; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerContextSendApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerReceiveApi; +import org.apache.giraph.block_app.framework.api.BlockWorkerSendApi; +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.block_app.framework.piece.AbstractPiece.InnerVertexSender; +import org.apache.giraph.block_app.framework.piece.interfaces.VertexReceiver; +import org.apache.hadoop.io.Writable; + +/** + * Object holding piece with it's corresponding execution stage. + * + * @param <S> Execution stage type + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class PairedPieceAndStage<S> { + private final AbstractPiece piece; + private final S executionStage; + + public PairedPieceAndStage(AbstractPiece piece, S executionStage) { + this.piece = piece; + this.executionStage = executionStage; + } + + public S nextExecutionStage() { + // if piece is null, then it cannot change the execution stage + return piece != null ? + (S) piece.nextExecutionStage(executionStage) : executionStage; + } + + public S getExecutionStage() { + return executionStage; + } + + public void registerReducers(BlockMasterApi masterApi) { + if (piece != null) { + piece.wrappedRegisterReducers(masterApi, executionStage); + } + } + + public InnerVertexSender getVertexSender(BlockWorkerSendApi sendApi) { + if (piece != null) { + return piece.getWrappedVertexSender(sendApi, executionStage); + } + return null; + } + + public void masterCompute(BlockMasterApi masterApi) { + if (piece != null) { + piece.masterCompute(masterApi, executionStage); + } + } + + public VertexReceiver getVertexReceiver( + BlockWorkerReceiveApi receiveApi) { + if (piece != null) { + return piece.getVertexReceiver(receiveApi, executionStage); + } + return null; + } + + public void workerContextSend( + BlockWorkerContextSendApi workerContextApi, Object workerValue) { + if (piece != null) { + piece.workerContextSend(workerContextApi, executionStage, workerValue); + } + } + + public void workerContextReceive( + BlockWorkerContextReceiveApi workerContextApi, + Object workerValue, List<Writable> workerMessages) { + if (piece != null) { + piece.workerContextReceive( + workerContextApi, executionStage, workerValue, workerMessages); + } + } + + /** + * @return the piece + */ + public AbstractPiece getPiece() { + return piece; + } + + @Override + public String toString() { + return "Piece " + piece + " in stage " + executionStage; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/package-info.java new file mode 100644 index 0000000..3ebe8f7 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/internal/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Implementation of execution logic, guiding internal execution of + * Block Application. + */ +package org.apache.giraph.block_app.framework.internal; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java new file mode 100644 index 0000000..6f2a3dd --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputDesc.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.output; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.Progressable; + +/** + * Output description + * + * @param <OW> Writer type + */ +public interface BlockOutputDesc<OW extends BlockOutputWriter> { + /** + * Initialize output and perform any necessary checks + * + * @param jobIdentifier Unique identifier of the job + * @param conf Configuration + */ + void initializeAndCheck(String jobIdentifier, Configuration conf); + + /** + * Create writer + * + * @param conf Configuration + * @param hadoopProgressable Progressable to call progress on + * @return Writer + */ + OW createOutputWriter(Configuration conf, Progressable hadoopProgressable); + + /** + * Commit everything + */ + void commit(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputFormat.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputFormat.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputFormat.java new file mode 100644 index 0000000..818a311 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/output/BlockOutputFormat.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.output; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.giraph.bsp.BspOutputFormat; +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.GiraphConstants; +import org.apache.giraph.conf.StrConfOption; +import org.apache.giraph.utils.ConfigurationObjectUtils; +import org.apache.giraph.utils.DefaultOutputCommitter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Hadoop output format to use with block output. + * It keeps track of all registered outputs, and knows how to create them. + */ +public class BlockOutputFormat extends BspOutputFormat { + private static final StrConfOption OUTPUT_CONF_OPTIONS = new StrConfOption( + "digraph.outputConfOptions", "", + "List of conf options for outputs used"); + + public static <OD> void addOutputDesc(OD outputDesc, String confOption, + GiraphConfiguration conf) { + GiraphConstants.HADOOP_OUTPUT_FORMAT_CLASS.set(conf, + BlockOutputFormat.class); + String currentOutputs = OUTPUT_CONF_OPTIONS.get(conf); + if (!currentOutputs.isEmpty()) { + currentOutputs = currentOutputs + ","; + } + OUTPUT_CONF_OPTIONS.set(conf, currentOutputs + confOption); + ConfigurationObjectUtils.setObjectKryo(outputDesc, confOption, conf); + } + + private static String[] getOutputConfOptions(Configuration conf) { + String outputConfOptions = OUTPUT_CONF_OPTIONS.get(conf); + return outputConfOptions.isEmpty() ? + new String[0] : outputConfOptions.split(","); + } + + public static <OW extends BlockOutputWriter, OD extends BlockOutputDesc<OW>> + OD createInitAndCheckOutputDesc(String confOption, Configuration conf, + String jobIdentifier) { + OD outputDesc = ConfigurationObjectUtils.getObjectKryo(confOption, conf); + outputDesc.initializeAndCheck(jobIdentifier, conf); + return outputDesc; + } + + public static Map<String, BlockOutputDesc> + createInitAndCheckOutputDescsMap(Configuration conf, String jobIdentifier) { + String[] outputConfOptions = getOutputConfOptions(conf); + Map<String, BlockOutputDesc> ret = new HashMap<>(outputConfOptions.length); + for (String outputConfOption : outputConfOptions) { + ret.put(outputConfOption, + createInitAndCheckOutputDesc(outputConfOption, conf, jobIdentifier)); + } + return ret; + } + + public static Map<String, BlockOutputDesc> createInitAndCheckOutputDescsMap( + JobContext jobContext) { + return createInitAndCheckOutputDescsMap(jobContext.getConfiguration(), + jobContext.getJobID().toString()); + } + + @Override + public void checkOutputSpecs(JobContext jobContext) + throws IOException, InterruptedException { + createInitAndCheckOutputDescsMap(jobContext); + } + + @Override + public OutputCommitter getOutputCommitter( + TaskAttemptContext context) throws IOException, InterruptedException { + return new DefaultOutputCommitter() { + @Override + public void commit(JobContext jobContext) throws IOException { + Map<String, BlockOutputDesc> map = + createInitAndCheckOutputDescsMap(jobContext); + for (BlockOutputDesc outputDesc : map.values()) { + outputDesc.commit(); + } + } + }; + } +}
