Repository: giraph Updated Branches: refs/heads/trunk bad44d472 -> fda1bb382
Improving and adding reducers Summary: - adding common reducers - Max/Min/Sum Reducer. As you can see - no code duplication - adding NumericTypeOps functions to support the above - added Varint encoding class - renaming registerReduce -> registerReducer Test Plan: mvn install Reviewers: pavanka, sergey.edunov, maja.kabiljo Reviewed By: maja.kabiljo Differential Revision: https://reviews.facebook.net/D28983 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/fda1bb38 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/fda1bb38 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/fda1bb38 Branch: refs/heads/trunk Commit: fda1bb3820ac574ed5d59cb779171e313798336d Parents: bad44d4 Author: Igor Kabiljo <[email protected]> Authored: Mon Dec 1 14:21:12 2014 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Mon Dec 1 14:24:10 2014 -0800 ---------------------------------------------------------------------- .../matrix/dense/FloatDenseMatrix.java | 8 +- .../matrix/dense/IntDenseMatrix.java | 8 +- .../matrix/sparse/DoubleSparseVector.java | 14 +- .../matrix/sparse/FloatSparseMatrix.java | 8 +- .../matrix/sparse/IntSparseMatrix.java | 8 +- .../matrix/sparse/LongSparseVector.java | 14 +- .../giraph/benchmark/ReducersBenchmark.java | 8 +- .../AggregatorToGlobalCommTranslation.java | 4 +- .../giraph/master/MasterAggregatorHandler.java | 6 +- .../org/apache/giraph/master/MasterCompute.java | 8 +- .../giraph/master/MasterGlobalCommUsage.java | 4 +- .../apache/giraph/reducers/impl/MaxReduce.java | 83 ++++++++ .../apache/giraph/reducers/impl/MinReduce.java | 83 ++++++++ .../apache/giraph/reducers/impl/SumReduce.java | 81 ++++++++ .../giraph/reducers/impl/package-info.java | 21 ++ .../apache/giraph/types/ops/DoubleTypeOps.java | 34 +++- .../apache/giraph/types/ops/FloatTypeOps.java | 33 +++- .../org/apache/giraph/types/ops/IntTypeOps.java | 33 +++- .../apache/giraph/types/ops/LongTypeOps.java | 33 +++- .../apache/giraph/types/ops/NumericTypeOps.java | 67 +++++++ .../org/apache/giraph/types/ops/TypeOps.java | 3 + .../apache/giraph/types/ops/TypeOpsUtils.java | 27 +++ .../types/ops/collections/BasicArrayList.java | 41 ++++ .../java/org/apache/giraph/utils/Varint.java | 198 +++++++++++++++++++ .../org/apache/giraph/utils/WritableUtils.java | 39 ++++ .../worker/WorkerAggregatorDelegator.java | 5 + .../giraph/worker/WorkerAggregatorHandler.java | 16 +- .../apache/giraph/worker/WorkerReduceUsage.java | 9 + 28 files changed, 854 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/FloatDenseMatrix.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/FloatDenseMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/FloatDenseMatrix.java index ce75d6d..588a92a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/FloatDenseMatrix.java +++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/FloatDenseMatrix.java @@ -26,9 +26,9 @@ import java.util.ArrayList; */ public class FloatDenseMatrix { /** The number of rows in the matrix */ - private int numRows; + private final int numRows; /** The number of columns in the matrix */ - private int numColumns; + private final int numColumns; /** The rows of the matrix */ private ArrayList<FloatDenseVector> rows = null; @@ -109,7 +109,7 @@ public class FloatDenseMatrix { * @param i the row number * @return the row of the matrix */ - FloatDenseVector getRow(int i) { + public FloatDenseVector getRow(int i) { return rows.get(i); } @@ -118,7 +118,7 @@ public class FloatDenseMatrix { * * @param vec the vector to add */ - void addRow(FloatDenseVector vec) { + public void addRow(FloatDenseVector vec) { if (rows.size() >= numRows) { throw new RuntimeException("Cannot add more rows!"); } http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/IntDenseMatrix.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/IntDenseMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/IntDenseMatrix.java index ed85574..9ec5439 100644 --- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/IntDenseMatrix.java +++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/dense/IntDenseMatrix.java @@ -26,9 +26,9 @@ import java.util.ArrayList; */ public class IntDenseMatrix { /** The number of rows in the matrix */ - private int numRows; + private final int numRows; /** The number of columns in the matrix */ - private int numColumns; + private final int numColumns; /** The rows of the matrix */ private ArrayList<IntDenseVector> rows = null; @@ -109,7 +109,7 @@ public class IntDenseMatrix { * @param i the row number * @return the row of the matrix */ - IntDenseVector getRow(int i) { + public IntDenseVector getRow(int i) { return rows.get(i); } @@ -118,7 +118,7 @@ public class IntDenseMatrix { * * @param vec the vector to add */ - void addRow(IntDenseVector vec) { + public void addRow(IntDenseVector vec) { if (rows.size() >= numRows) { throw new RuntimeException("Cannot add more rows!"); } http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/DoubleSparseVector.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/DoubleSparseVector.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/DoubleSparseVector.java index fb54459..7abf0e6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/DoubleSparseVector.java +++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/DoubleSparseVector.java @@ -19,14 +19,13 @@ package org.apache.giraph.aggregators.matrix.sparse; import it.unimi.dsi.fastutil.ints.Int2DoubleMap; +import it.unimi.dsi.fastutil.ints.Int2DoubleOpenHashMap; +import it.unimi.dsi.fastutil.objects.ObjectIterator; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import it.unimi.dsi.fastutil.ints.Int2DoubleOpenHashMap; -import it.unimi.dsi.fastutil.objects.ObjectIterator; - import org.apache.hadoop.io.Writable; /** @@ -85,6 +84,15 @@ public class DoubleSparseVector implements Writable { } /** + * Increment value for a given key + * @param key Key + * @param value Increment + */ + public void add(int key, double value) { + entries.addTo(key, value); + } + + /** * Clear the contents of the vector. */ public void clear() { http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/FloatSparseMatrix.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/FloatSparseMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/FloatSparseMatrix.java index a54ae31..e021555 100644 --- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/FloatSparseMatrix.java +++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/FloatSparseMatrix.java @@ -26,9 +26,9 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; */ public class FloatSparseMatrix { /** The number of rows in the matrix */ - private int numRows; + private final int numRows; /** The rows of the matrix */ - private Int2ObjectOpenHashMap<FloatSparseVector> rows; + private final Int2ObjectOpenHashMap<FloatSparseVector> rows; /** * Create a new matrix with the given number of rows. @@ -88,7 +88,7 @@ public class FloatSparseMatrix { * @param i the row number * @return the row of the matrix */ - FloatSparseVector getRow(int i) { + public FloatSparseVector getRow(int i) { return rows.get(i); } @@ -98,7 +98,7 @@ public class FloatSparseMatrix { * @param i the row * @param vec the vector to set as the row */ - void setRow(int i, FloatSparseVector vec) { + public void setRow(int i, FloatSparseVector vec) { rows.put(i, vec); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/IntSparseMatrix.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/IntSparseMatrix.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/IntSparseMatrix.java index b7cde77..29e8482 100644 --- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/IntSparseMatrix.java +++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/IntSparseMatrix.java @@ -26,9 +26,9 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; */ public class IntSparseMatrix { /** The number of rows in the matrix */ - private int numRows; + private final int numRows; /** The rows of the matrix */ - private Int2ObjectOpenHashMap<IntSparseVector> rows; + private final Int2ObjectOpenHashMap<IntSparseVector> rows; /** * Create a new matrix with the given number of rows. @@ -88,7 +88,7 @@ public class IntSparseMatrix { * @param i the row number * @return the row of the matrix */ - IntSparseVector getRow(int i) { + public IntSparseVector getRow(int i) { return rows.get(i); } @@ -98,7 +98,7 @@ public class IntSparseMatrix { * @param i the row * @param vec the vector to set as the row */ - void setRow(int i, IntSparseVector vec) { + public void setRow(int i, IntSparseVector vec) { rows.put(i, vec); } } http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/LongSparseVector.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/LongSparseVector.java b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/LongSparseVector.java index 6337215..6a16525 100644 --- a/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/LongSparseVector.java +++ b/giraph-core/src/main/java/org/apache/giraph/aggregators/matrix/sparse/LongSparseVector.java @@ -19,14 +19,13 @@ package org.apache.giraph.aggregators.matrix.sparse; import it.unimi.dsi.fastutil.ints.Int2LongMap; +import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap; +import it.unimi.dsi.fastutil.objects.ObjectIterator; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import it.unimi.dsi.fastutil.ints.Int2LongOpenHashMap; -import it.unimi.dsi.fastutil.objects.ObjectIterator; - import org.apache.hadoop.io.Writable; /** @@ -85,6 +84,15 @@ public class LongSparseVector implements Writable { } /** + * Increment value for a given key + * @param key Key + * @param value Increment + */ + public void add(int key, long value) { + entries.addTo(key, value); + } + + /** * Clear the contents of the vector. */ public void clear() { http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java index 263274d..16c33e9 100644 --- a/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java +++ b/giraph-core/src/main/java/org/apache/giraph/benchmark/ReducersBenchmark.java @@ -126,15 +126,15 @@ public class ReducersBenchmark extends GiraphBenchmark { String mi = "m" + i; String pi = "p" + i; - registerReduce(wi, TestLongSumReducer.INSTANCE); - registerReduce(mi, new TestLongSumReducer()); + registerReducer(wi, TestLongSumReducer.INSTANCE); + registerReducer(mi, new TestLongSumReducer()); if (superstep > 0) { broadcast(wi, getReduced(wi)); broadcast(mi, new LongWritable(-superstep * i)); broadcast(pi, getReduced(pi)); - registerReduce(pi, new TestLongSumReducer(), + registerReducer(pi, new TestLongSumReducer(), (LongWritable) getReduced(pi)); assertEquals(superstep * (getTotalNumVertices() * i) + w, @@ -142,7 +142,7 @@ public class ReducersBenchmark extends GiraphBenchmark { assertEquals(superstep * getTotalNumVertices() * i, ((LongWritable) getReduced(pi)).get()); } else { - registerReduce(pi, new TestLongSumReducer()); + registerReducer(pi, new TestLongSumReducer()); } } } http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java index eb25182..fa3f376 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/AggregatorToGlobalCommTranslation.java @@ -135,10 +135,10 @@ public class AggregatorToGlobalCommTranslation AggregatorReduceOperation<Writable> cleanReduceOp = entry.getValue().createReduceOp(); if (entry.getValue().isPersistent()) { - globalComm.registerReduce( + globalComm.registerReducer( entry.getKey(), cleanReduceOp, value); } else { - globalComm.registerReduce( + globalComm.registerReducer( entry.getKey(), cleanReduceOp); } entry.getValue().setCurrentValue(null); http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java index ccee656..98de9d6 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterAggregatorHandler.java @@ -80,13 +80,13 @@ public class MasterAggregatorHandler } @Override - public final <S, R extends Writable> void registerReduce( + public final <S, R extends Writable> void registerReducer( String name, ReduceOperation<S, R> reduceOp) { - registerReduce(name, reduceOp, reduceOp.createInitialValue()); + registerReducer(name, reduceOp, reduceOp.createInitialValue()); } @Override - public <S, R extends Writable> void registerReduce( + public <S, R extends Writable> void registerReducer( String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) { if (reducerMap.containsKey(name)) { http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java index 68eb416..eb4144a 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterCompute.java @@ -190,15 +190,15 @@ public abstract class MasterCompute } @Override - public final <S, R extends Writable> void registerReduce( + public final <S, R extends Writable> void registerReducer( String name, ReduceOperation<S, R> reduceOp) { - serviceMaster.getGlobalCommHandler().registerReduce(name, reduceOp); + serviceMaster.getGlobalCommHandler().registerReducer(name, reduceOp); } @Override - public final <S, R extends Writable> void registerReduce( + public final <S, R extends Writable> void registerReducer( String name, ReduceOperation<S, R> reduceOp, R globalInitialValue) { - serviceMaster.getGlobalCommHandler().registerReduce( + serviceMaster.getGlobalCommHandler().registerReducer( name, reduceOp, globalInitialValue); } http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java index c3ce0ea..7ee9048 100644 --- a/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java +++ b/giraph-core/src/main/java/org/apache/giraph/master/MasterGlobalCommUsage.java @@ -33,7 +33,7 @@ public interface MasterGlobalCommUsage { * @param <S> Single value type * @param <R> Reduced value type */ - <S, R extends Writable> void registerReduce( + <S, R extends Writable> void registerReducer( String name, ReduceOperation<S, R> reduceOp); /** @@ -48,7 +48,7 @@ public interface MasterGlobalCommUsage { * @param <S> Single value type * @param <R> Reduced value type */ - <S, R extends Writable> void registerReduce( + <S, R extends Writable> void registerReducer( String name, ReduceOperation<S, R> reduceOp, R globalInitialValue); /** http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java new file mode 100644 index 0000000..9d603a1 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MaxReduce.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.reducers.impl; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.giraph.reducers.OnSameReduceOperation; +import org.apache.giraph.types.ops.DoubleTypeOps; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.giraph.types.ops.NumericTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Reducer for calculating max of values + * @param <T> Value type + */ +public class MaxReduce<T extends WritableComparable> + extends OnSameReduceOperation<T> { + /** DoubleWritable specialization */ + public static final MaxReduce<DoubleWritable> DOUBLE = + new MaxReduce<>(DoubleTypeOps.INSTANCE); + /** LongWritable specialization */ + public static final MaxReduce<LongWritable> LONG = + new MaxReduce<>(LongTypeOps.INSTANCE); + + /** Value type operations */ + private NumericTypeOps<T> typeOps; + + /** Constructor used for deserialization only */ + public MaxReduce() { + } + + /** + * Constructor + * @param typeOps Value type operations + */ + public MaxReduce(NumericTypeOps<T> typeOps) { + this.typeOps = typeOps; + } + + @Override + public T createInitialValue() { + return typeOps.createMinNegativeValue(); + } + + @Override + public T reduceSingle(T curValue, T valueToReduce) { + if (curValue.compareTo(valueToReduce) < 0) { + typeOps.set(curValue, valueToReduce); + } + return curValue; + } + + @Override + public void write(DataOutput out) throws IOException { + TypeOpsUtils.writeTypeOps(typeOps, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + typeOps = TypeOpsUtils.readTypeOps(in); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java new file mode 100644 index 0000000..9972340 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/MinReduce.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.reducers.impl; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.giraph.reducers.OnSameReduceOperation; +import org.apache.giraph.types.ops.DoubleTypeOps; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.giraph.types.ops.NumericTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.WritableComparable; + +/** + * Reducer for calculating min of values + * @param <T> Value type + */ +public class MinReduce<T extends WritableComparable> + extends OnSameReduceOperation<T> { + /** DoubleWritable specialization */ + public static final MinReduce<DoubleWritable> DOUBLE = + new MinReduce<>(DoubleTypeOps.INSTANCE); + /** LongWritable specialization */ + public static final MinReduce<LongWritable> LONG = + new MinReduce<>(LongTypeOps.INSTANCE); + + /** Value type operations */ + private NumericTypeOps<T> typeOps; + + /** Constructor used for deserialization only */ + public MinReduce() { + } + + /** + * Constructor + * @param typeOps Value type operations + */ + public MinReduce(NumericTypeOps<T> typeOps) { + this.typeOps = typeOps; + } + + @Override + public T createInitialValue() { + return typeOps.createMaxPositiveValue(); + } + + @Override + public T reduceSingle(T curValue, T valueToReduce) { + if (curValue.compareTo(valueToReduce) > 0) { + typeOps.set(curValue, valueToReduce); + } + return curValue; + } + + @Override + public void write(DataOutput out) throws IOException { + TypeOpsUtils.writeTypeOps(typeOps, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + typeOps = TypeOpsUtils.readTypeOps(in); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java new file mode 100644 index 0000000..3138733 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/SumReduce.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.reducers.impl; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.giraph.reducers.OnSameReduceOperation; +import org.apache.giraph.types.ops.DoubleTypeOps; +import org.apache.giraph.types.ops.LongTypeOps; +import org.apache.giraph.types.ops.NumericTypeOps; +import org.apache.giraph.types.ops.TypeOpsUtils; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Writable; + +/** + * Reducer for calculating sum of values + * @param <T> Value type + */ +public class SumReduce<T extends Writable> + extends OnSameReduceOperation<T> { + /** DoubleWritable specialization */ + public static final SumReduce<DoubleWritable> DOUBLE = + new SumReduce<>(DoubleTypeOps.INSTANCE); + /** LongWritable specialization */ + public static final SumReduce<LongWritable> LONG = + new SumReduce<>(LongTypeOps.INSTANCE); + + /** Value type operations */ + private NumericTypeOps<T> typeOps; + + /** Constructor used for deserialization only */ + public SumReduce() { + } + + /** + * Constructor + * @param typeOps Value type operations + */ + public SumReduce(NumericTypeOps<T> typeOps) { + this.typeOps = typeOps; + } + + @Override + public T createInitialValue() { + return typeOps.createZero(); + } + + @Override + public T reduceSingle(T curValue, T valueToReduce) { + typeOps.plusInto(curValue, valueToReduce); + return curValue; + } + + @Override + public void write(DataOutput out) throws IOException { + TypeOpsUtils.writeTypeOps(typeOps, out); + } + + @Override + public void readFields(DataInput in) throws IOException { + typeOps = TypeOpsUtils.readTypeOps(in); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/reducers/impl/package-info.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/reducers/impl/package-info.java b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/package-info.java new file mode 100644 index 0000000..ba61ce8 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/reducers/impl/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Package of Giraph reducers. + */ +package org.apache.giraph.reducers.impl; http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java index af8c38f..1ca7796 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/DoubleTypeOps.java @@ -21,7 +21,9 @@ import org.apache.giraph.types.ops.collections.BasicArrayList.BasicDoubleArrayLi import org.apache.hadoop.io.DoubleWritable; /** TypeOps implementation for working with DoubleWritable type */ -public enum DoubleTypeOps implements PrimitiveTypeOps<DoubleWritable> { +public enum DoubleTypeOps + implements PrimitiveTypeOps<DoubleWritable>, + NumericTypeOps<DoubleWritable> { /** Singleton instance */ INSTANCE(); @@ -49,4 +51,34 @@ public enum DoubleTypeOps implements PrimitiveTypeOps<DoubleWritable> { public BasicDoubleArrayList createArrayList(int capacity) { return new BasicDoubleArrayList(capacity); } + + @Override + public DoubleWritable createMinNegativeValue() { + return new DoubleWritable(Double.NEGATIVE_INFINITY); + } + + @Override + public DoubleWritable createMaxPositiveValue() { + return new DoubleWritable(Double.POSITIVE_INFINITY); + } + + @Override + public DoubleWritable createZero() { + return new DoubleWritable(0); + } + + @Override + public void plusInto(DoubleWritable value, DoubleWritable increment) { + value.set(value.get() + increment.get()); + } + + @Override + public void multiplyInto(DoubleWritable value, DoubleWritable multiplier) { + value.set(value.get() * multiplier.get()); + } + + @Override + public void negate(DoubleWritable value) { + value.set(-value.get()); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java index 3ca8409..3c69868 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/FloatTypeOps.java @@ -21,7 +21,8 @@ import org.apache.giraph.types.ops.collections.BasicArrayList.BasicFloatArrayLis import org.apache.hadoop.io.FloatWritable; /** TypeOps implementation for working with FloatWritable type */ -public enum FloatTypeOps implements PrimitiveTypeOps<FloatWritable> { +public enum FloatTypeOps + implements PrimitiveTypeOps<FloatWritable>, NumericTypeOps<FloatWritable> { /** Singleton instance */ INSTANCE(); @@ -49,4 +50,34 @@ public enum FloatTypeOps implements PrimitiveTypeOps<FloatWritable> { public BasicFloatArrayList createArrayList(int capacity) { return new BasicFloatArrayList(capacity); } + + @Override + public FloatWritable createMinNegativeValue() { + return new FloatWritable(Float.NEGATIVE_INFINITY); + } + + @Override + public FloatWritable createMaxPositiveValue() { + return new FloatWritable(Float.POSITIVE_INFINITY); + } + + @Override + public FloatWritable createZero() { + return new FloatWritable(0); + } + + @Override + public void plusInto(FloatWritable value, FloatWritable increment) { + value.set(value.get() + increment.get()); + } + + @Override + public void multiplyInto(FloatWritable value, FloatWritable multiplier) { + value.set(value.get() * multiplier.get()); + } + + @Override + public void negate(FloatWritable value) { + value.set(-value.get()); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java index f9a32c0..57e1b53 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java @@ -26,7 +26,8 @@ import org.apache.giraph.types.ops.collections.BasicSet.BasicIntOpenHashSet; import org.apache.hadoop.io.IntWritable; /** TypeOps implementation for working with IntWritable type */ -public enum IntTypeOps implements PrimitiveIdTypeOps<IntWritable> { +public enum IntTypeOps + implements PrimitiveIdTypeOps<IntWritable>, NumericTypeOps<IntWritable> { /** Singleton instance */ INSTANCE; @@ -65,4 +66,34 @@ public enum IntTypeOps implements PrimitiveIdTypeOps<IntWritable> { int capacity) { return new BasicInt2ObjectOpenHashMap<>(capacity); } + + @Override + public IntWritable createMinNegativeValue() { + return new IntWritable(Integer.MIN_VALUE); + } + + @Override + public IntWritable createMaxPositiveValue() { + return new IntWritable(Integer.MAX_VALUE); + } + + @Override + public IntWritable createZero() { + return new IntWritable(0); + } + + @Override + public void plusInto(IntWritable value, IntWritable increment) { + value.set(value.get() + increment.get()); + } + + @Override + public void multiplyInto(IntWritable value, IntWritable multiplier) { + value.set(value.get() * multiplier.get()); + } + + @Override + public void negate(IntWritable value) { + value.set(-value.get()); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java index 4e5ca54..d7fa198 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java @@ -26,7 +26,8 @@ import org.apache.giraph.types.ops.collections.BasicSet.BasicLongOpenHashSet; import org.apache.hadoop.io.LongWritable; /** TypeOps implementation for working with LongWritable type */ -public enum LongTypeOps implements PrimitiveIdTypeOps<LongWritable> { +public enum LongTypeOps + implements PrimitiveIdTypeOps<LongWritable>, NumericTypeOps<LongWritable> { /** Singleton instance */ INSTANCE; @@ -65,4 +66,34 @@ public enum LongTypeOps implements PrimitiveIdTypeOps<LongWritable> { int capacity) { return new BasicLong2ObjectOpenHashMap<>(capacity); } + + @Override + public LongWritable createMinNegativeValue() { + return new LongWritable(Long.MIN_VALUE); + } + + @Override + public LongWritable createMaxPositiveValue() { + return new LongWritable(Long.MAX_VALUE); + } + + @Override + public LongWritable createZero() { + return new LongWritable(0); + } + + @Override + public void plusInto(LongWritable value, LongWritable increment) { + value.set(value.get() + increment.get()); + } + + @Override + public void multiplyInto(LongWritable value, LongWritable multiplier) { + value.set(value.get() * multiplier.get()); + } + + @Override + public void negate(LongWritable value) { + value.set(-value.get()); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java new file mode 100644 index 0000000..396c914 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/NumericTypeOps.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.types.ops; + +/** + * Numeric type operations, allowing working generically with types, + * but still having efficient code. + * + * Using any of the provided operations should lead to no boxing/unboxing. + * + * @param <T> Type + */ +public interface NumericTypeOps<T> extends TypeOps<T> { + /** + * Minimal negative value representable via current type. + * Negative infinity for floating point numbers. + * @return New object with min negative value + */ + T createMinNegativeValue(); + /** + * Maximal positive value representable via current type. + * Positive infinity for floating point numbers. + * @return New object with max positive value + */ + T createMaxPositiveValue(); + /** + * Value of zero + * @return New object with value of zero + */ + T createZero(); + + /** + * value+=adder + * + * @param value Value to modify + * @param increment Increment + */ + void plusInto(T value, T increment); + /** + * value*=multiplier + * + * @param value Value to modify + * @param multiplier Multiplier + */ + void multiplyInto(T value, T multiplier); + + /** + * -value + * @param value Value to negate + */ + void negate(T value); +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java index b7f9479..c4bd702 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOps.java @@ -23,6 +23,9 @@ package org.apache.giraph.types.ops; * but still having efficient code. * For example, by reducing object allocation via reuse. * + * Use enum singleton pattern, having single enum value - INSTANCE. + * Serialization code depends on implementations being enums. + * * @param <T> Type */ public interface TypeOps<T> { http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java index df5f2bd..785fda1 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/TypeOpsUtils.java @@ -17,6 +17,11 @@ */ package org.apache.giraph.types.ops; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.giraph.utils.WritableUtils; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.ByteWritable; import org.apache.hadoop.io.DoubleWritable; @@ -146,4 +151,26 @@ public class TypeOpsUtils { type + " not supported in TypeOps"); } } + + /** + * Write TypeOps object into a stream + * @param typeOps type ops instance + * @param output output stream + * @param <T> Corresponding type + */ + public static <T> void writeTypeOps(TypeOps<T> typeOps, + DataOutput output) throws IOException { + WritableUtils.writeEnum((Enum) typeOps, output); + } + + /** + * Read TypeOps object from the stream + * @param input input stream + * @param <O> Concrete TypeOps type + * @return type ops instance + */ + public static <O extends TypeOps<?>> O readTypeOps( + DataInput input) throws IOException { + return (O) WritableUtils.readEnum(input); + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicArrayList.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicArrayList.java b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicArrayList.java index df5ca24..a96fb69 100644 --- a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicArrayList.java +++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicArrayList.java @@ -58,6 +58,17 @@ public abstract class BasicArrayList<T> implements Writable { */ public abstract int size(); /** + * Sets the size of this list. + * + * <P> + * If the specified size is smaller than the current size, + * the last elements are discarded. + * Otherwise, they are filled with 0/<code>null</code>/<code>false</code>. + * + * @param newSize the new size. + */ + public abstract void size(int newSize); + /** * Capacity of currently allocated memory * @return capacity */ @@ -168,6 +179,11 @@ public abstract class BasicArrayList<T> implements Writable { } @Override + public void size(int newSize) { + list.size(newSize); + } + + @Override public int capacity() { return list.elements().length; } @@ -250,6 +266,11 @@ public abstract class BasicArrayList<T> implements Writable { } @Override + public void size(int newSize) { + list.size(newSize); + } + + @Override public int capacity() { return list.elements().length; } @@ -332,6 +353,11 @@ public abstract class BasicArrayList<T> implements Writable { } @Override + public void size(int newSize) { + list.size(newSize); + } + + @Override public int capacity() { return list.elements().length; } @@ -414,6 +440,11 @@ public abstract class BasicArrayList<T> implements Writable { } @Override + public void size(int newSize) { + list.size(newSize); + } + + @Override public int capacity() { return list.elements().length; } @@ -496,6 +527,11 @@ public abstract class BasicArrayList<T> implements Writable { } @Override + public void size(int newSize) { + list.size(newSize); + } + + @Override public int capacity() { return list.elements().length; } @@ -578,6 +614,11 @@ public abstract class BasicArrayList<T> implements Writable { } @Override + public void size(int newSize) { + list.size(newSize); + } + + @Override public int capacity() { return list.elements().length; } http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java b/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java new file mode 100644 index 0000000..89d4e90 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/Varint.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.utils; + +/** + * This Code is Copied from main/java/org/apache/mahout/math/Varint.java + * + * Only modification is throwing exceptions for passing negative values to + * unsigned functions, instead of serializing them. + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +/** + * <p> + * Encodes signed and unsigned values using a common variable-length scheme, + * found for example in <a + * href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> + * Google's Protocol Buffers</a>. It uses fewer bytes to encode smaller values, + * but will use slightly more bytes to encode large values. + * </p> + * <p/> + * <p> + * Signed values are further encoded using so-called zig-zag encoding in order + * to make them "compatible" with variable-length encoding. + * </p> + */ +public final class Varint { + + /** + * private constructor + */ + private Varint() { + } + + /** + * Encodes a value using the variable-length encoding from <a + * href="http://code.google.com/apis/protocolbuffers/docs/encoding.html"> + * Google Protocol Buffers</a>. Zig-zag is not used, so input must not be + * negative. If values can be negative, use + * {@link #writeSignedVarLong(long, DataOutput)} instead. This method treats + * negative input as like a large unsigned value. + * + * @param value + * value to encode + * @param out + * to write bytes to + * @throws IOException + * if {@link DataOutput} throws {@link IOException} + */ + public static void writeUnsignedVarLong( + long value, DataOutput out) throws IOException { + if (value < 0) { + throw new IllegalArgumentException( + "Negative value passed into writeUnsignedVarLong - " + value); + } + while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) { + out.writeByte(((int) value & 0x7F) | 0x80); + value >>>= 7; + } + out.writeByte((int) value & 0x7F); + } + + /** + * @see #writeUnsignedVarLong(long, DataOutput) + * @param value + * value to encode + * @param out + * to write bytes to + */ + public static void writeUnsignedVarInt( + int value, DataOutput out) throws IOException { + if (value < 0) { + throw new IllegalArgumentException( + "Negative value passed into writeUnsignedVarInt - " + value); + } + while ((value & 0xFFFFFF80) != 0L) { + out.writeByte((value & 0x7F) | 0x80); + value >>>= 7; + } + out.writeByte(value & 0x7F); + } + + /** + * @param in + * to read bytes from + * @return decode value + * @throws IOException + * if {@link DataInput} throws {@link IOException} + * @throws IllegalArgumentException + * if variable-length value does not terminate after 9 bytes have + * been read + * @see #writeUnsignedVarLong(long, DataOutput) + */ + public static long readUnsignedVarLong(DataInput in) throws IOException { + long value = 0L; + int i = 0; + long b = in.readByte(); + while ((b & 0x80L) != 0) { + value |= (b & 0x7F) << i; + i += 7; + if (i > 63) { + throw new IllegalArgumentException( + "Variable length quantity is too long"); + } + b = in.readByte(); + } + return value | (b << i); + } + + /** + * @throws IllegalArgumentException + * if variable-length value does not terminate after + * 5 bytes have been read + * @throws IOException + * if {@link DataInput} throws {@link IOException} + * @param in to read bytes from. + * @return decode value. + */ + public static int readUnsignedVarInt(DataInput in) throws IOException { + int value = 0; + int i = 0; + int b = in.readByte(); + while ((b & 0x80) != 0) { + value |= (b & 0x7F) << i; + i += 7; + if (i > 35) { + throw new IllegalArgumentException( + "Variable length quantity is too long"); + } + b = in.readByte(); + } + return value | (b << i); + } + /** + * Simulation for what will happen when writing an unsigned long value + * as varlong. + * @param value the value + * @return the number of bytes needed to write value. + * @throws IOException + */ + public static long sizeOfUnsignedVarLong(long value) throws IOException { + long cnt = 0; + while ((value & 0xFFFFFFFFFFFFFF80L) != 0L) { + cnt++; + value >>>= 7; + } + return ++cnt; + } + + /** + * Simulation for what will happen when writing an unsigned int value + * as varint. + * @param value the value + * @return the number of bytes needed to write value. + * @throws IOException + */ + public static long sizeOfUnsignedVarInt(int value) throws IOException { + long cnt = 0; + while ((value & 0xFFFFFF80) != 0L) { + cnt++; + value >>>= 7; + } + return ++cnt; + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java index 3c37bec..592ef7e 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java @@ -27,6 +27,7 @@ import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; @@ -857,4 +858,42 @@ public class WritableUtils { return res; } + /** + * Writes enum into a stream, by serializing class name and it's index + * @param enumValue Enum value + * @param output Output stream + * @param <T> Enum type + */ + public static <T extends Enum<T>> void writeEnum(T enumValue, + DataOutput output) throws IOException { + writeClass( + enumValue != null ? enumValue.getDeclaringClass() : null, output); + if (enumValue != null) { + Varint.writeUnsignedVarInt(enumValue.ordinal(), output); + } + } + + /** + * Reads enum from the stream, serialized by writeEnum + * @param input Input stream + * @param <T> Enum type + * @return Enum value + */ + public static <T extends Enum<T>> T readEnum(DataInput input) throws + IOException { + Class<T> clazz = readClass(input); + if (clazz != null) { + int ordinal = Varint.readUnsignedVarInt(input); + try { + T[] values = (T[]) clazz.getDeclaredMethod("values").invoke(null); + return values[ordinal]; + } catch (IllegalAccessException | IllegalArgumentException + | InvocationTargetException | NoSuchMethodException + | SecurityException e) { + throw new IOException("Cannot read enum", e); + } + } else { + return null; + } + } } http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java index 916e7a0..6472850 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorDelegator.java @@ -54,6 +54,11 @@ public abstract class WorkerAggregatorDelegator<I extends WritableComparable, } @Override + public void reducePartial(String name, Writable value) { + workerGlobalCommUsage.reducePartial(name, value); + } + + @Override public final <B extends Writable> B getBroadcast(String name) { return workerGlobalCommUsage.getBroadcast(name); } http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java index ee47542..96d239d 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerAggregatorHandler.java @@ -113,7 +113,8 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage { * @param name Name of the reducer * @param valueToReduce Partial value to reduce */ - protected void reducePartial(String name, Writable valueToReduce) { + @Override + public void reducePartial(String name, Writable valueToReduce) { Reducer<Object, Writable> reducer = reducerMap.get(name); if (reducer != null) { progressable.progress(); @@ -329,6 +330,19 @@ public class WorkerAggregatorHandler implements WorkerThreadGlobalCommUsage { } @Override + public void reducePartial(String name, Writable value) { + Reducer<Object, Writable> reducer = threadReducerMap.get(name); + if (reducer != null) { + progressable.progress(); + reducer.reducePartial(value); + } else { + throw new IllegalStateException("reducePartial: " + + AggregatorUtils.getUnregisteredAggregatorMessage(name, + threadReducerMap.size() != 0, conf)); + } + } + + @Override public <B extends Writable> B getBroadcast(String name) { return WorkerAggregatorHandler.this.getBroadcast(name); } http://git-wip-us.apache.org/repos/asf/giraph/blob/fda1bb38/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java index 9c2e90d..fe7cd32 100644 --- a/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java +++ b/giraph-core/src/main/java/org/apache/giraph/worker/WorkerReduceUsage.java @@ -17,6 +17,8 @@ */ package org.apache.giraph.worker; +import org.apache.hadoop.io.Writable; + /** * Methods on worker can provide values to reduce through this interface */ @@ -27,4 +29,11 @@ public interface WorkerReduceUsage { * @param value Single value to reduce */ void reduce(String name, Object value); + + /** + * Reduce given partial value. + * @param name Name of the reducer + * @param value Single value to reduce + */ + void reducePartial(String name, Writable value); }
