http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/ReducersForPieceHandler.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/ReducersForPieceHandler.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/ReducersForPieceHandler.java new file mode 100644 index 0000000..f06dd89 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/ReducersForPieceHandler.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.global_comm.internal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; +import org.apache.giraph.master.MasterGlobalCommUsage; +import org.apache.giraph.reducers.ReduceOperation; +import org.apache.giraph.reducers.Reducer; +import org.apache.giraph.utils.WritableUtils; +import org.apache.giraph.worker.WorkerBroadcastUsage; +import org.apache.giraph.worker.WorkerReduceUsage; +import org.apache.hadoop.io.Writable; + +/** + * All logic for transforming Giraph's reducer API to reducer handles. + * Contains state of active reducers, and is kept within a Piece. + */ +public class ReducersForPieceHandler implements VertexSenderObserver { + private static final AtomicInteger HANDLER_COUNTER = new AtomicInteger(); + private static final AtomicInteger BROADCAST_COUNTER = new AtomicInteger(); + + private final int handleIndex = HANDLER_COUNTER.incrementAndGet(); + private final AtomicInteger reduceCounter = new AtomicInteger(); + + private final ArrayList<VertexSenderObserver> observers = new ArrayList<>(); + + @Override + public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) { + for (VertexSenderObserver observer : observers) { + observer.vertexSenderWorkerPreprocess(usage); + } + } + + @Override + public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) { + for (VertexSenderObserver observer : observers) { + observer.vertexSenderWorkerPostprocess(usage); + } + } + + public <S, R extends Writable> ReducerHandle<S, R> createLocalReducer( + MasterGlobalCommUsage master, ReduceOperation<S, R> reduceOp, + R globalInitialValue) { + LocalReduceHandle<S, R> handle = new LocalReduceHandle<>(reduceOp); + master.registerReducer(handle.getName(), reduceOp, globalInitialValue); + observers.add(handle); + return handle; + } + + public <S, R extends Writable> ReducerHandle<S, R> createGlobalReducer( + MasterGlobalCommUsage master, ReduceOperation<S, R> reduceOp, + R globalInitialValue) { + ReduceHandleImpl<S, R> handle = new GlobalReduceHandle<>(reduceOp); + master.registerReducer(handle.getName(), reduceOp, globalInitialValue); + observers.add(handle); + return handle; + } + + /** + * Implementation of BroadcastHandle + * + * @param <T> Value type + */ + public static class BroadcastHandleImpl<T> implements BroadcastHandle<T> { + private final String name; + + public BroadcastHandleImpl() { + this.name = "_utils.broadcast." + BROADCAST_COUNTER.incrementAndGet(); + } + + public String getName() { + return name; + } + + @Override + public T getBroadcast(WorkerBroadcastUsage worker) { + return worker.getBroadcast(name); + } + } + + /** + * Parent implementation of ReducerHandle + * + * @param <S> Single value type + * @param <R> Reduced value type + */ + public abstract class ReduceHandleImpl<S, R extends Writable> + implements ReducerHandle<S, R>, VertexSenderObserver { + protected final ReduceOperation<S, R> reduceOp; + private final String name; + + private ReduceHandleImpl(ReduceOperation<S, R> reduceOp) { + this.reduceOp = reduceOp; + name = "_utils." + handleIndex + + ".reduce." + reduceCounter.incrementAndGet(); + } + + public String getName() { + return name; + } + + @Override + public R getReducedValue(MasterGlobalCommUsage master) { + return master.getReduced(name); + } + + @Override + public BroadcastHandle<R> broadcastValue(BlockMasterApi master) { + return unwrapHandle(master.broadcast( + new WrappedReducedValue<>(reduceOp, getReducedValue(master)))); + } + } + + private static <R extends Writable> BroadcastHandle<R> unwrapHandle( + final BroadcastHandle<WrappedReducedValue<R>> handle) { + return new BroadcastHandle<R>() { + @Override + public R getBroadcast(WorkerBroadcastUsage worker) { + return handle.getBroadcast(worker).getValue(); + } + }; + } + + /** + * Wrapper that makes reduced values self-serializable, + * and allows them to be broadcasted. + * + * @param <R> Reduced value type + */ + public static class WrappedReducedValue<R extends Writable> + implements Writable { + private ReduceOperation<?, R> reduceOp; + private R value; + + public WrappedReducedValue() { + } + + public WrappedReducedValue(ReduceOperation<?, R> reduceOp, R value) { + this.reduceOp = reduceOp; + this.value = value; + } + + @Override + public void write(DataOutput out) throws IOException { + WritableUtils.writeWritableObject(reduceOp, out); + value.write(out); + } + + @Override + public void readFields(DataInput in) throws IOException { + reduceOp = WritableUtils.readWritableObject(in, null); + value = reduceOp.createInitialValue(); + value.readFields(in); + } + + public R getValue() { + return value; + } + } + + /** + * Global Reduce Handle is implementation of ReducerHandle, that will keep + * only one value for each worker, and each call to reduce will have + * to obtain a global lock, and incur synchronization costs. + * Use only when objects are so large, that having many copies cannot fit + * into memory. + * + * @param <S> Single value type + * @param <R> Reduced value type + */ + public class GlobalReduceHandle<S, R extends Writable> + extends ReduceHandleImpl<S, R> { + private transient WorkerReduceUsage usage; + + public GlobalReduceHandle(ReduceOperation<S, R> reduceOp) { + super(reduceOp); + } + + @Override + public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) { + this.usage = usage; + } + + @Override + public void reduce(S valueToReduce) { + usage.reduce(getName(), valueToReduce); + } + + @Override + public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) { + } + } + + /** + * Local Reduce Handle is implementation of ReducerHandle, that will make a + * partially reduced value on each worker thread, which are at the end + * reduced all together. + * This is preferred implementation, unless it cannot be used due to memory + * overhead, because all partially reduced values will not fit the memory. + * + * @param <S> Single value type + * @param <R> Reduced value type + */ + public class LocalReduceHandle<S, R extends Writable> + extends ReduceHandleImpl<S, R> { + private transient Reducer<S, R> reducer; + + public LocalReduceHandle(ReduceOperation<S, R> reduceOp) { + super(reduceOp); + } + + @Override + public void vertexSenderWorkerPreprocess(WorkerReduceUsage usage) { + this.reducer = new Reducer<>(reduceOp); + } + + @Override + public void reduce(S valueToReduce) { + reducer.reduce(valueToReduce); + } + + @Override + public void vertexSenderWorkerPostprocess(WorkerReduceUsage usage) { + usage.reduceMerge(getName(), reducer.getCurrentValue()); + } + } +}
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/VertexSenderObserver.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/VertexSenderObserver.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/VertexSenderObserver.java new file mode 100644 index 0000000..5b3485f --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/VertexSenderObserver.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.global_comm.internal; + +import org.apache.giraph.worker.WorkerReduceUsage; + +/** + * Observer able to hook into vertex sender pre/post processing + */ +public interface VertexSenderObserver { + void vertexSenderWorkerPreprocess(WorkerReduceUsage usage); + void vertexSenderWorkerPostprocess(WorkerReduceUsage usage); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/package-info.java new file mode 100644 index 0000000..1ba7c8f --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/internal/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Reducer and Broadcast Handles internal implementation for automatic handling + * of global communication within Pieces, hiding a lot of it's complexities. + */ +package org.apache.giraph.block_app.framework.piece.global_comm.internal; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/BroadcastMapHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/BroadcastMapHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/BroadcastMapHandle.java new file mode 100644 index 0000000..50d7818 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/BroadcastMapHandle.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.global_comm.map; + +import org.apache.giraph.block_app.framework.piece.global_comm.BroadcastHandle; +import org.apache.giraph.worker.WorkerBroadcastUsage; + +/** + * Handle to map of broadcasts + * + * @param <K> Key type + * @param <V> Value type + */ +public interface BroadcastMapHandle<K, V> + extends MapHandle<K, BroadcastHandle<V>> { + + /** + * Number of elements that were broadcasted. + */ + int getBroadcastedSize(WorkerBroadcastUsage worker); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/MapHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/MapHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/MapHandle.java new file mode 100644 index 0000000..db01e77 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/MapHandle.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.global_comm.map; + +/** + * Handle to map of handles underneath + * + * @param <K> Key type + * @param <V> Value type + */ +public interface MapHandle<K, V> { + /** + * Get value for key. + */ + V get(K key); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/ReducerMapHandle.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/ReducerMapHandle.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/ReducerMapHandle.java new file mode 100644 index 0000000..5c31179 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/ReducerMapHandle.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.global_comm.map; + +import org.apache.giraph.block_app.framework.api.BlockMasterApi; +import org.apache.giraph.block_app.framework.piece.global_comm.ReducerHandle; + +/** + * Handle to array of reducers + * + * @param <I> Key type + * @param <S> Single value type + * @param <R> Reduced value type + */ +public interface ReducerMapHandle<I, S, R> + extends MapHandle<I, ReducerHandle<S, R>> { + /** + * Number of elements that were reduced. + */ + int getReducedSize(BlockMasterApi master); + + /** + * Broadcast whole map of reducers to master + * + * @return Handle to the broadcasted map. + */ + BroadcastMapHandle<I, R> broadcastValue(BlockMasterApi master); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/package-info.java new file mode 100644 index 0000000..092f864 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/map/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Interfaces representing map of individual handles + */ +package org.apache.giraph.block_app.framework.piece.global_comm.map; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/package-info.java new file mode 100644 index 0000000..0d40741 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/global_comm/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Interfaces for Reducer and Broadcast Handles for automatic handling + * of global communication within Pieces, hiding a lot of it's complexities. + */ +package org.apache.giraph.block_app.framework.piece.global_comm; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexPostprocessor.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexPostprocessor.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexPostprocessor.java new file mode 100644 index 0000000..b6cc749 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexPostprocessor.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.interfaces; + +import org.apache.giraph.writable.kryo.markers.NonKryoWritable; + +/** + * Interface containing a single function - postprocess. + * + * Marked to not allow seriazliation, as it should be created on the worker, + * so should never be serialiized, disallow only for catching problems early. + */ +public interface VertexPostprocessor extends NonKryoWritable { + /** + * Override to finish computation. This method is executed exactly once + * after computation for all vertices in the partition is complete. + */ + void postprocess(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexReceiver.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexReceiver.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexReceiver.java new file mode 100644 index 0000000..26912ee --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexReceiver.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.interfaces; + +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.writable.kryo.markers.NonKryoWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Interface representing actions that happen on worker, for each vertex, + * during receive phase: + * <ul> + * <li> to receive messages from vertices </li> + * <li> to receive data from master through aggregators </li> + * </ul> + * + * Marked to not allow seriazliation, as it should be created on the worker, + * so should never be serialiized, disallow only for catching problems early. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + * @param <M> Message type + */ +@SuppressWarnings("rawtypes") +public interface VertexReceiver<I extends WritableComparable, + V extends Writable, E extends Writable, M extends Writable> + extends NonKryoWritable { + /** + * Must be defined by user to do computation on a single Vertex. + * + * @param vertex Vertex + * @param messages Messages that were sent to this vertex in the previous + * superstep. Each message is only guaranteed to have + * a life expectancy as long as next() is not called. + */ + void vertexReceive(Vertex<I, V, E> vertex, Iterable<M> messages); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexSender.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexSender.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexSender.java new file mode 100644 index 0000000..0587032 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/VertexSender.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.interfaces; + +import org.apache.giraph.graph.Vertex; +import org.apache.giraph.writable.kryo.markers.NonKryoWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Interface representing actions that happen on worker, for each vertex, + * during send phase: + * <ul> + * <li> to send messages to vertices </li> + * <li> to send data for aggregation on master </li> + * </ul> + * + * Marked to not allow seriazliation, as it should be created on the worker, + * so should never be serialiized, disallow only for catching problems early. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + */ +@SuppressWarnings("rawtypes") +public interface VertexSender<I extends WritableComparable, + V extends Writable, E extends Writable> extends NonKryoWritable { + /** Must be defined by user to do computation on a single Vertex. */ + void vertexSend(Vertex<I, V, E> vertex); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/package-info.java new file mode 100644 index 0000000..db05e78 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/interfaces/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Vertex processing functions for Pieces + */ +package org.apache.giraph.block_app.framework.piece.interfaces; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/ObjectMessageClasses.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/ObjectMessageClasses.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/ObjectMessageClasses.java new file mode 100644 index 0000000..dd977e6 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/ObjectMessageClasses.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.messages; + +import org.apache.giraph.combiner.MessageCombiner; +import org.apache.giraph.comm.messages.MessageEncodeAndStoreType; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.conf.MessageClasses; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.utils.ReflectionUtils; +import org.apache.giraph.writable.kryo.KryoWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +import com.google.common.base.Preconditions; + +/** + * MessageClasses implementation that provides factory and combiner instances + * through a provided supplier. + * + * @param <I> Vertex id type + * @param <M> Message type + */ +public class ObjectMessageClasses<I extends WritableComparable, + M extends Writable> extends KryoWritable implements MessageClasses<I, M> { + private final Class<M> messageClass; + private final SupplierFromConf<MessageValueFactory<M>> + messageValueFactorySupplier; + private final SupplierFromConf<? extends MessageCombiner<? super I, M>> + messageCombinerSupplier; + private final MessageEncodeAndStoreType messageEncodeAndStoreType; + + public ObjectMessageClasses() { + this(null, null, null, null); + } + + public ObjectMessageClasses(Class<M> messageClass, + SupplierFromConf<MessageValueFactory<M>> messageValueFactorySupplier, + SupplierFromConf<? extends MessageCombiner<? super I, M>> + messageCombinerSupplier, + MessageEncodeAndStoreType messageEncodeAndStoreType) { + this.messageClass = messageClass; + this.messageValueFactorySupplier = messageValueFactorySupplier; + this.messageCombinerSupplier = messageCombinerSupplier; + this.messageEncodeAndStoreType = messageEncodeAndStoreType; + } + + @Override + public Class<M> getMessageClass() { + return messageClass; + } + + @Override + public MessageValueFactory<M> createMessageValueFactory( + ImmutableClassesGiraphConfiguration conf) { + return Preconditions.checkNotNull(messageValueFactorySupplier.apply(conf)); + } + + @Override + public MessageCombiner<? super I, M> createMessageCombiner( + ImmutableClassesGiraphConfiguration<I, ? extends Writable, + ? extends Writable> conf) { + return messageCombinerSupplier != null ? + Preconditions.checkNotNull(messageCombinerSupplier.apply(conf)) : null; + } + + @Override + public boolean useMessageCombiner() { + return messageCombinerSupplier != null; + } + + @Override + public MessageEncodeAndStoreType getMessageEncodeAndStoreType() { + return messageEncodeAndStoreType; + } + + @Override + public MessageClasses<I, M> createCopyForNewSuperstep() { + return new ObjectMessageClasses<>( + messageClass, messageValueFactorySupplier, + messageCombinerSupplier, messageEncodeAndStoreType); + } + + @Override + public void verifyConsistent(ImmutableClassesGiraphConfiguration conf) { + MessageValueFactory<M> messageValueFactory = + messageValueFactorySupplier.apply(conf); + Preconditions.checkState( + messageValueFactory.newInstance().getClass().equals(messageClass)); + + if (messageCombinerSupplier != null) { + MessageCombiner<? super I, M> messageCombiner = + messageCombinerSupplier.apply(conf); + Preconditions.checkState(messageCombiner.createInitialMessage() + .getClass().equals(messageClass)); + Class<?>[] combinerTypes = ReflectionUtils.getTypeArguments( + MessageCombiner.class, messageCombiner.getClass()); + ReflectionUtils.verifyTypes(conf.getVertexIdClass(), combinerTypes[0], + "Vertex id", messageCombiner.getClass()); + ReflectionUtils.verifyTypes(messageClass, combinerTypes[1], + "Outgoing message", messageCombiner.getClass()); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/SupplierFromConf.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/SupplierFromConf.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/SupplierFromConf.java new file mode 100644 index 0000000..00c86cd --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/SupplierFromConf.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.piece.messages; + +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.factories.DefaultMessageValueFactory; +import org.apache.giraph.factories.MessageValueFactory; +import org.apache.giraph.function.Function; +import org.apache.giraph.writable.kryo.HadoopKryo; +import org.apache.hadoop.io.Writable; + +/** + * Supplier from configuration + * @param <T> Type of object returned + */ +public interface SupplierFromConf<T> + extends Function<ImmutableClassesGiraphConfiguration, T> { + + /** + * Supplier from configuration, by copying given instance every time. + * + * @param <T> Type of object returned + */ + public static class SupplierFromConfByCopy<T> implements SupplierFromConf<T> { + private final T value; + + public SupplierFromConfByCopy(T value) { + this.value = value; + } + + @Override + public T apply(ImmutableClassesGiraphConfiguration conf) { + return HadoopKryo.createCopy(value); + } + } + + /** + * Supplier from configuration returning DefaultMessageValueFactory instances. + * + * @param <M> Message type + */ + public static class DefaultMessageFactorySupplierFromConf<M extends Writable> + implements SupplierFromConf<MessageValueFactory<M>> { + private final Class<M> messageClass; + + public DefaultMessageFactorySupplierFromConf(Class<M> messageClass) { + this.messageClass = messageClass; + } + + @Override + public MessageValueFactory<M> apply( + ImmutableClassesGiraphConfiguration conf) { + return new DefaultMessageValueFactory<>(messageClass, conf); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/package-info.java new file mode 100644 index 0000000..ba3014c --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/messages/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Utility classes for handling of messages within Pieces + */ +package org.apache.giraph.block_app.framework.piece.messages; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/package-info.java new file mode 100644 index 0000000..fbc6e92 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/block_app/framework/piece/package-info.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Single execution object - Piece, and related classes. + * + * AbstractPiece is parent class of all Pieces. Most frequentlly + * users should extend Piece class itself + */ +package org.apache.giraph.block_app.framework.piece; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/Consumer.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/Consumer.java b/giraph-block-app/src/main/java/org/apache/giraph/function/Consumer.java new file mode 100644 index 0000000..2a0e36a --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/Consumer.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function; + +import java.io.Serializable; + + +/** + * Function: + * (T) -> void + * + * @param <T> Argument type + */ +public interface Consumer<T> extends Serializable { + /** + * Applies this function to {@code input} + */ + void apply(T input); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/Function.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/Function.java b/giraph-block-app/src/main/java/org/apache/giraph/function/Function.java new file mode 100644 index 0000000..41046ba --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/Function.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function; + +import java.io.Serializable; + + +/** + * Function: + * (F) -> T + * + * @param <F> Argument type + * @param <T> Result type + */ +public interface Function<F, T> extends Serializable { + /** + * Returns the result of applying this function to given {@code input}. + * + * The returned object may or may not be a new instance, + * depending on the implementation. + */ + T apply(F input); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/PairConsumer.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/PairConsumer.java b/giraph-block-app/src/main/java/org/apache/giraph/function/PairConsumer.java new file mode 100644 index 0000000..012ec82 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/PairConsumer.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function; + +import java.io.Serializable; + +/** + * Function: + * (T1, T2) -> void + * + * @param <T1> First argument type + * @param <T2> Second argument type + */ +public interface PairConsumer<T1, T2> extends Serializable { + /** + * Applies this function to {@code input1} and {@code input2} + */ + void apply(T1 input1, T2 input2); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/PairFunction.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/PairFunction.java b/giraph-block-app/src/main/java/org/apache/giraph/function/PairFunction.java new file mode 100644 index 0000000..bfff400 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/PairFunction.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function; + +import java.io.Serializable; + + +/** + * Function: + * (F1, F2) -> T + * + * @param <F1> First argument type + * @param <F2> Second argument type + * @param <T> Result type + */ +public interface PairFunction<F1, F2, T> extends Serializable { + /** + * Returns the result of applying this function to given + * {@code input1} and {@code input2}. + * + * The returned object may or may not be a new instance, + * depending on the implementation. + */ + T apply(F1 input1, F2 input2); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/Supplier.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/Supplier.java b/giraph-block-app/src/main/java/org/apache/giraph/function/Supplier.java new file mode 100644 index 0000000..1813b54 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/Supplier.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function; + +import java.io.Serializable; + +/** + * Function: + * () -> T + * <br> + * Specialization of com.google.common.base.Supplier, that is also + * Serializable. + * + * @param <T> Result type + */ +public interface Supplier<T> extends Serializable { + /** + * Retrieves an instance of the appropriate type. The returned object may or + * may not be a new instance, depending on the implementation. + */ + T get(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/function/package-info.java new file mode 100644 index 0000000..b089da6 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package for interfaces representing different functions, + * that all extends Serializable in order for Kryo to be able + * to serialize them. + * + * Even when same interface is present in Guava, we do not extend it + * due to @Nullable annotations adding requirement of handling nulls. + */ +package org.apache.giraph.function; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/IntSupplier.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/IntSupplier.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/IntSupplier.java new file mode 100644 index 0000000..2cf74e1 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/IntSupplier.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function.primitive; + +/** + * Primitive specialization of Function: + * () -> int + */ +public interface IntSupplier { + /** Retrieves an int value. */ + int get(); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/package-info.java new file mode 100644 index 0000000..fdf40ff --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/primitive/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Primitive specializations of interfaces from org.apache.giraph.function + * package. + */ +package org.apache.giraph.function.primitive; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/ConsumerWithVertex.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/ConsumerWithVertex.java b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/ConsumerWithVertex.java new file mode 100644 index 0000000..87e7f9b --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/ConsumerWithVertex.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function.vertex; + +import java.io.Serializable; + +import org.apache.giraph.function.PairConsumer; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + + +/** + * Function: + * (vertex, T) -> void + * + * A class that can consume objects of a single type, when given a vertex. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + * @param <T> Argument type + */ +@SuppressWarnings("rawtypes") +public interface ConsumerWithVertex<I extends WritableComparable, + V extends Writable, E extends Writable, T> + extends PairConsumer<Vertex<I, V, E>, T>, Serializable { + /** + * Applies this function to {@code vertex} and {@code input} + */ + @Override + void apply(Vertex<I, V, E> vertex, T value); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/FunctionWithVertex.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/FunctionWithVertex.java b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/FunctionWithVertex.java new file mode 100644 index 0000000..fdab5de --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/FunctionWithVertex.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function.vertex; + +import java.io.Serializable; + +import org.apache.giraph.function.PairFunction; +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Function: + * (vertex, F) -> T + * + * Determines an output value based on a vertex and an input value. + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + * @param <F> Argument type + * @param <T> Result type + */ +@SuppressWarnings("rawtypes") +public interface FunctionWithVertex<I extends WritableComparable, + V extends Writable, E extends Writable, F, T> + extends PairFunction<Vertex<I, V, E>, F, T>, Serializable { + /** + * Returns the result of applying this function to given + * {@code vertex} and {@code input}. + * + * The returned object may or may not be a new instance, + * depending on the implementation. + */ + @Override + T apply(Vertex<I, V, E> vertex, F input); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/SupplierFromVertex.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/SupplierFromVertex.java b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/SupplierFromVertex.java new file mode 100644 index 0000000..bc0f9c1 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/SupplierFromVertex.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.function.vertex; + +import java.io.Serializable; + +import org.apache.giraph.graph.Vertex; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Function: + * (vertex) -> T + * + * A class that can supply objects of a single type, when given a vertex. + * + * (doesn't extend Function<Vertex<I, V, E>, T>, because of different + * method names) + * + * @param <I> Vertex id type + * @param <V> Vertex value type + * @param <E> Edge value type + * @param <T> Result type + */ +@SuppressWarnings("rawtypes") +public interface SupplierFromVertex<I extends WritableComparable, + V extends Writable, E extends Writable, T> extends Serializable { + /** + * Retrieves an instance of the appropriate type, given a vertex. + * The returned object may or may not be a new instance, + * depending on the implementation. + */ + T get(Vertex<I, V, E> vertex); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/package-info.java b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/package-info.java new file mode 100644 index 0000000..bd5b019 --- /dev/null +++ b/giraph-block-app/src/main/java/org/apache/giraph/function/vertex/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package for interfaces representing functions additionally + * performed on vertex values. + */ +package org.apache.giraph.function.vertex; http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java new file mode 100644 index 0000000..6487d95 --- /dev/null +++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/BlockTestingUtils.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.block; + +import static org.junit.Assert.assertEquals; +import it.unimi.dsi.fastutil.ints.IntArrayList; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.Random; + +import org.apache.giraph.block_app.framework.piece.AbstractPiece; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + +@SuppressWarnings({"unchecked", "rawtypes"}) +public class BlockTestingUtils { + + BlockTestingUtils() { } + + private static final int NUM_TRIALS = 10; + private static final int REPEAT_TIMES = 10; + + private static int testSequential(Iterator<? extends AbstractPiece> referenceImpl, + Iterator<? extends AbstractPiece> testImpl) { + int length = 0; + + CheckIterator checkIterator = new CheckIterator(referenceImpl, testImpl); + while (checkIterator.hasNext()) { + checkIterator.next(); + length++; + } + + System.out.println("Length is : " + length); + return length; + } + + private static boolean anyHasNext(ArrayList<? extends Iterator> arr) { + for (Iterator t : arr) { + if (t.hasNext()) { + return true; + } + } + return false; + } + + private static void testRandom(int length, + Iterable<? extends AbstractPiece> referenceImpl, + Iterable<? extends AbstractPiece> testImpl) { + Random rand = new Random(); + + ArrayList<CheckIterator<AbstractPiece>> arr = new ArrayList<>(); + IntArrayList lengths = new IntArrayList(NUM_TRIALS); + for (int i = 0; i < NUM_TRIALS; i++) { + lengths.add(0); + } + for (int i = 0; i < NUM_TRIALS; i++) { + arr.add(new CheckIterator(referenceImpl.iterator(), testImpl.iterator())); + } + + int totalCount = 0; + while (anyHasNext(arr)) { + int index = rand.nextInt(NUM_TRIALS); + while (!arr.get(index).hasNext()) { + index = rand.nextInt(NUM_TRIALS); + } + CheckIterator it = arr.get(index); + it.next(); + int itLength = lengths.getInt(index); + lengths.set(index, itLength + 1); + totalCount++; + } + assertEquals("TotalCount should be length * NUM_TRIALS", length * NUM_TRIALS, totalCount); + System.out.println("Final count is : " + totalCount); + } + + /** + * Tests both the length of the iterator returned by the block, as-well as the deterministic behavior + * expected by calling .iterator() against the referenceImpl. + * @param referenceImpl : A list of pieces in the expected order + * @param testImpl : A list of pieces to test against (the Block) + */ + public static void testIndependence(Iterable<? extends AbstractPiece> referenceImpl, + Iterable<? extends AbstractPiece> testImpl) { + int length = testSequential(referenceImpl.iterator(), testImpl.iterator()); + testRandom(length, referenceImpl, testImpl); + } + + /** + * Test how the block interacts with a repeatBlock. The expected result is to + * see the pieces in referenceImpl show up REPEAT_TIMES many times. + * @param referenceImpl : A list of pieces in the expected order + * @param block : The block to test + */ + public static void testNestedRepeatBlock(Iterable<? extends AbstractPiece> referenceImpl, Block block) { + Block repeatBlock = new RepeatBlock( + REPEAT_TIMES, + block + ); + testIndependence( + Iterables.concat(Collections.nCopies(REPEAT_TIMES, referenceImpl)), + repeatBlock + ); + } + + public static class CheckIterator<T> implements Iterator { + + private final Iterator<T> fst; + private final Iterator<T> snd; + + public CheckIterator(Iterator<T> fst, Iterator<T> snd) { + this.fst = fst; + this.snd = snd; + } + + @Override + public boolean hasNext() { + boolean fstHasNxt = fst.hasNext(); + boolean sndHasNxt = snd.hasNext(); + Preconditions.checkArgument(fstHasNxt == sndHasNxt, "Expect hasNext() on " + + "both iterators to be identical. Got: " + fst.hasNext() + " and " + snd.hasNext()); + return fstHasNxt; + } + + @Override + public Object next() { + T fstNxt = fst.next(); + T sndNxt = snd.next(); + Preconditions.checkArgument(fstNxt == sndNxt, "Expect objs returned by " + + "both iterators to be identical. Got: " + fstNxt + " and " + sndNxt); + return fstNxt; + } + + @Override + public void remove() { + throw new RuntimeException("Not implemented"); + } + + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java new file mode 100644 index 0000000..0dacae1 --- /dev/null +++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestIfBlock.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.block; + + +import java.util.Arrays; + +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.function.Supplier; +import org.junit.Test; + +public class TestIfBlock { + + private static final Supplier<Boolean> TRUE_SUPPLIER = new Supplier<Boolean>() { + @Override + public Boolean get() { + return true; + } + }; + + private static final Supplier<Boolean> FALSE_SUPPLIER = new Supplier<Boolean>() { + @Override + public Boolean get() { + return false; + } + }; + + @Test + // Test short-circuiting the if -> then + public void testIfBlockThen() throws Exception { + Piece piece1 = new Piece(); + Piece piece2 = new Piece(); + Block ifBlock = new IfBlock( + TRUE_SUPPLIER, + new SequenceBlock(piece1, piece2) + ); + + BlockTestingUtils.testIndependence( + Arrays.asList(piece1, piece2), + ifBlock); + } + + @Test + // Test short-circuiting the if -> else + public void testIfBlockElse() throws Exception { + Piece piece1 = new Piece(); + Piece piece2 = new Piece(); + Block ifBlock = new IfBlock( + FALSE_SUPPLIER, + new EmptyBlock(), + new SequenceBlock(piece1, piece2) + ); + + BlockTestingUtils.testIndependence( + Arrays.asList(piece1, piece2), + ifBlock); + } + + @Test + public void testIfNestedInRepeat() throws Exception { + Piece piece1 = new Piece(); + Piece piece2 = new Piece(); + Block ifBlock = new IfBlock( + TRUE_SUPPLIER, + new SequenceBlock(piece1, piece2) + ); + + BlockTestingUtils.testNestedRepeatBlock( + Arrays.asList(piece1, piece2), + ifBlock); + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java new file mode 100644 index 0000000..1e096ba --- /dev/null +++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatBlock.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.block; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.junit.Test; + +import com.google.common.collect.Iterables; + +/** + * Tests repeatBlock's correctness + */ +public class TestRepeatBlock { + + public static final int REPEAT_TIMES = 5; + + @Test + public void testRepeatBlockBasic() throws Exception { + Piece piece1 = new Piece(); + Piece piece2 = new Piece(); + Block innerBlock = new SequenceBlock(piece1, piece2); + Block repeatBlock = new RepeatBlock( + REPEAT_TIMES, + innerBlock + ); + BlockTestingUtils.testIndependence( + Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))), + repeatBlock); + } + + @Test + public void testNestedRepeatBlock() throws Exception { + Piece piece1 = new Piece(); + Piece piece2 = new Piece(); + Block innerBlock = new SequenceBlock(piece1, piece2); + Block repeatBlock = new RepeatBlock( + REPEAT_TIMES, + innerBlock + ); + BlockTestingUtils.testNestedRepeatBlock( + Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))), + repeatBlock); + } + + @Test + public void testRepeatBlockEmpty() throws Exception { + Block innerBlock = new EmptyBlock(); + Block repeatBlock = new RepeatBlock( + REPEAT_TIMES, + innerBlock + ); + List<? extends AbstractPiece> referenceImpl = Collections.emptyList(); + BlockTestingUtils.testIndependence( + // Concatenating EmptyIterator = just EmptyIterator. No obj's to + // compare against either + referenceImpl, + repeatBlock); + } + +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java ---------------------------------------------------------------------- diff --git a/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java new file mode 100644 index 0000000..242d376 --- /dev/null +++ b/giraph-block-app/src/test/java/org/apache/giraph/block_app/framework/block/TestRepeatUntilBlock.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.block_app.framework.block; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; + +import org.apache.giraph.block_app.framework.piece.AbstractPiece; +import org.apache.giraph.block_app.framework.piece.Piece; +import org.apache.giraph.function.Supplier; +import org.junit.Test; + +import com.google.common.collect.Iterables; + +/** + * Tests repeatUntilBlock's correctness + */ +public class TestRepeatUntilBlock { + + public static final int REPEAT_TIMES = 5; + + private static final Supplier<Boolean> falseSupplier = new Supplier<Boolean>() { + @Override + public Boolean get() { + return false; + } + }; + + @Test + public void testRepeatUntilBlockBasic() throws Exception { + Piece piece1 = new Piece(); + Piece piece2 = new Piece(); + Block innerBlock = new SequenceBlock(piece1, piece2); + Block repeatBlock = new RepeatUntilBlock( + REPEAT_TIMES, + innerBlock, + falseSupplier + ); + BlockTestingUtils.testIndependence( + Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))), + repeatBlock); + } + + @Test + public void testNestedRepeatUntilBlock() throws Exception { + Piece piece1 = new Piece(); + Piece piece2 = new Piece(); + Block innerBlock = new SequenceBlock(piece1, piece2); + Block repeatBlock = new RepeatUntilBlock( + REPEAT_TIMES, + innerBlock, + falseSupplier + ); + BlockTestingUtils.testNestedRepeatBlock( + Iterables.concat(Collections.nCopies(REPEAT_TIMES, Arrays.asList(piece1, piece2))), + repeatBlock); + } + + @Test + public void testRepeatUntilBlockUnlimited() throws Exception { + Block innerBlock = new SequenceBlock(new Piece()); + // Can't test with testIndependence - spin up our own test inline + Supplier<Boolean> countingSupplier = new Supplier<Boolean>() { + private int i = 0; + + @Override + public Boolean get() { + i++; + return i > REPEAT_TIMES; + } + }; + Block repeatBlock = RepeatUntilBlock.unlimited( + innerBlock, + countingSupplier + ); + int count = 0; + Iterator<AbstractPiece> it = repeatBlock.iterator(); + while (it.hasNext()) { + it.next(); + count++; + } + assertEquals("Count must be equal to REPEAT_TIMES", REPEAT_TIMES, count); + } + +}
