Repository: giraph Updated Branches: refs/heads/trunk 06a1084af -> 819d6d38d
http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java b/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java new file mode 100644 index 0000000..e82c3a8 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/edge/LongDiffNullArrayEdges.java @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.edge; + +import it.unimi.dsi.fastutil.bytes.ByteArrays; +import it.unimi.dsi.fastutil.longs.LongArrayList; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInput; +import java.io.DataInputStream; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.BitSet; +import java.util.Iterator; + +import javax.annotation.concurrent.NotThreadSafe; + +import org.apache.giraph.utils.EdgeIterables; +import org.apache.giraph.utils.Trimmable; +import org.apache.giraph.utils.Varint; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; + +import com.google.common.base.Preconditions; + +/** + * Implementation of {@link org.apache.giraph.edge.OutEdges} with long ids + * and null edge values, backed by a dynamic primitive array. + * Parallel edges are allowed. + * Note: this implementation is optimized for space usage, + * but random access and edge removals are expensive. + * Users of this class should explicitly call {@link #trim()} function + * to compact in-memory representation after all updates are done. + * Compacting object is expensive so should only be done once after bulk update. + * Compaction can also be caused by serialization attempt or + * by calling {@link #iterator()} + */ +@NotThreadSafe +public class LongDiffNullArrayEdges + extends ConfigurableOutEdges<LongWritable, NullWritable> + implements ReuseObjectsOutEdges<LongWritable, NullWritable>, + MutableOutEdges<LongWritable, NullWritable>, Trimmable { + + /** + * Array of target vertex ids. + */ + private byte[] compressedData; + /** + * Number of edges stored in compressed array. + * There may be some extra edges in transientData or there may be some edges + * removed. These will not count here. To get real number of elements stored + * in this object @see {@link #size()} + */ + private int size; + + /** + * Last updates are stored here. We clear them out after object is compacted. + */ + private TransientChanges transientData; + + @Override + public void initialize(Iterable<Edge<LongWritable, NullWritable>> edges) { + reset(); + EdgeIterables.initialize(this, edges); + trim(); + } + + @Override + public void initialize(int capacity) { + reset(); + if (capacity > 0) { + transientData = new TransientChanges(capacity); + } + } + + @Override + public void initialize() { + reset(); + } + + @Override + public void add(Edge<LongWritable, NullWritable> edge) { + checkTransientData(); + transientData.add(edge.getTargetVertexId().get()); + } + + + @Override + public void remove(LongWritable targetVertexId) { + checkTransientData(); + long target = targetVertexId.get(); + + if (size > 0) { + LongsDiffReader reader = new LongsDiffReader(compressedData); + for (int i = 0; i < size; i++) { + long cur = reader.readNext(); + if (cur == target) { + transientData.markRemoved(i); + } else if (cur > target) { + break; + } + } + } + transientData.removeAdded(target); + } + + @Override + public int size() { + int result = size; + if (transientData != null) { + result += transientData.size(); + } + return result; + } + + @Override + public Iterator<Edge<LongWritable, NullWritable>> iterator() { + // Returns an iterator that reuses objects. + // The downcast is fine because all concrete Edge implementations are + // mutable, but we only expose the mutation functionality when appropriate. + return (Iterator) mutableIterator(); + } + + @Override + public Iterator<MutableEdge<LongWritable, NullWritable>> mutableIterator() { + trim(); + return new Iterator<MutableEdge<LongWritable, NullWritable>>() { + /** Current position in the array. */ + private int position; + private final LongsDiffReader reader = + new LongsDiffReader(compressedData); + + /** Representative edge object. */ + private final MutableEdge<LongWritable, NullWritable> representativeEdge = + EdgeFactory.createReusable(new LongWritable()); + + @Override + public boolean hasNext() { + return position < size; + } + + @Override + public MutableEdge<LongWritable, NullWritable> next() { + position++; + representativeEdge.getTargetVertexId().set(reader.readNext()); + return representativeEdge; + } + + @Override + public void remove() { + removeAt(position - 1); + } + }; + } + + @Override + public void write(DataOutput out) throws IOException { + trim(); + Varint.writeUnsignedVarInt(compressedData.length, out); + Varint.writeUnsignedVarInt(size, out); + out.write(compressedData); + } + + @Override + public void readFields(DataInput in) throws IOException { + reset(); + compressedData = new byte[Varint.readUnsignedVarInt(in)]; + // We can actually calculate size after data array is read, + // the trade-off is memory vs speed + size = Varint.readUnsignedVarInt(in); + in.readFully(compressedData); + } + + /** + * This function takes all recent updates and stores them efficiently. + * It is safe to call this function multiple times. + */ + @Override + public void trim() { + if (transientData == null) { + // We don't have any updates to this object. Return quickly. + return; + } + + // Beware this array is longer than the number of elements we interested in + long[] transientValues = transientData.sortedValues(); + int pCompressed = 0; + int pTransient = 0; + + LongsDiffReader reader = new LongsDiffReader(compressedData); + LongsDiffWriter writer = new LongsDiffWriter(); + + long curValue = size > 0 ? reader.readNext() : Long.MAX_VALUE; + + // Here we merge freshly added elements and old elements, we also want + // to prune removed elements. Both arrays are sorted so in order to merge + // them, we move to pointers and store result in the new array + while (pTransient < transientData.numberOfAddedElements() || + pCompressed < size) { + if (pTransient < transientData.numberOfAddedElements() && + curValue >= transientValues[pTransient]) { + writer.writeNext(transientValues[pTransient]); + pTransient++; + } else { + if (!transientData.isRemoved(pCompressed)) { + writer.writeNext(curValue); + } + pCompressed++; + if (pCompressed < size) { + curValue = reader.readNext(); + } else { + curValue = Long.MAX_VALUE; + } + } + } + + compressedData = writer.toByteArray(); + size += transientData.size(); + transientData = null; + } + + + /** + * Remove edge at position i. + * + * @param i Position of edge to be removed + */ + private void removeAt(int i) { + checkTransientData(); + if (i < size) { + transientData.markRemoved(i); + } else { + transientData.removeAddedAt(i - size); + } + } + + /** + * Check if transient data needs to be created. + */ + private void checkTransientData() { + if (transientData == null) { + transientData = new TransientChanges(); + } + } + + /** + * Reset object to completely empty state. + */ + private void reset() { + compressedData = ByteArrays.EMPTY_ARRAY; + size = 0; + transientData = null; + } + + /** + * Reading array of longs diff encoded from byte array. + */ + private static class LongsDiffReader { + /** Input stream */ + private final DataInput input; + /** last read value */ + private long current; + /** True if we haven't read any numbers yet */ + private boolean first = true; + + /** + * Construct LongsDiffReader + * @param compressedData Input byte array + */ + LongsDiffReader(byte[] compressedData) { + input = new DataInputStream(new ByteArrayInputStream(compressedData)); + } + + /** + * Read next value from reader + * @return next value + */ + long readNext() { + try { + if (first) { + current = input.readLong(); + first = false; + } else { + current += Varint.readUnsignedVarLong(input); + } + return current; + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + } + + /** + * Writing array of longs diff encoded into the byte array. + */ + private static class LongsDiffWriter { + /** Byte array stream containing result */ + private final ByteArrayOutputStream resultStream = + new ByteArrayOutputStream(); + /** Wrapping resultStream into DataOutputStream */ + private final DataOutputStream out = new DataOutputStream(resultStream); + /** last value written */ + private long lastWritten; + /** True if we haven't written any numbers yet */ + private boolean first = true; + + /** + * Write next value to writer + * @param value Value to be written + */ + void writeNext(long value) { + try { + if (first) { + out.writeLong(value); + first = false; + } else { + Preconditions.checkState(value >= lastWritten, + "Values need to be in order"); + Preconditions.checkState((value - lastWritten) >= 0, + "In order to use this class, difference of consecutive IDs " + + "cannot overflow longs"); + Varint.writeUnsignedVarLong(value - lastWritten, out); + } + lastWritten = value; + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + /** + * Get resulting byte array + * @return resulting byte array + */ + byte[] toByteArray() { + return resultStream.toByteArray(); + } + } + + /** + * Temporary storage for all updates. + * We don't want to update compressed array frequently so we only update it + * on request at the same time we allow temporary updates to persist in this + * class. + */ + private static class TransientChanges { + /** Neighbors that were added since last flush */ + private final LongArrayList neighborsAdded; + /** Removed indices in original array */ + private final BitSet removed = new BitSet(); + /** Number of values removed */ + private int removedCount; + + /** + * Construct transient changes with given capacity + * @param capacity capacity + */ + private TransientChanges(int capacity) { + neighborsAdded = new LongArrayList(capacity); + } + + /** + * Construct transient changes + */ + private TransientChanges() { + neighborsAdded = new LongArrayList(); + } + + /** + * Add new value + * @param value value to add + */ + private void add(long value) { + neighborsAdded.add(value); + } + + /** + * Mark given index to remove + * @param index Index to remove + */ + private void markRemoved(int index) { + if (!removed.get(index)) { + removedCount++; + removed.set(index); + } + } + + /** + * Remove value from neighborsAdded + * @param index Position to remove from + */ + private void removeAddedAt(int index) { + // The order of the edges is irrelevant, so we can simply replace + // the deleted edge with the rightmost element, thus achieving constant + // time. + if (index == neighborsAdded.size() - 1) { + neighborsAdded.popLong(); + } else { + neighborsAdded.set(index, neighborsAdded.popLong()); + } + } + + /** + * Number of added elements + * @return number of added elements + */ + private int numberOfAddedElements() { + return neighborsAdded.size(); + } + + /** + * Remove added value + * @param target value to remove + */ + private void removeAdded(long target) { + neighborsAdded.rem(target); + } + + /** + * Additional size in transient changes + * @return additional size + */ + private int size() { + return neighborsAdded.size() - removedCount; + } + + /** + * Sorted added values + * @return sorted added values + */ + private long[] sortedValues() { + long[] ret = neighborsAdded.elements(); + Arrays.sort(ret, 0, neighborsAdded.size()); + return ret; + } + + /** + * Check if index was removed + * @param i Index to check + * @return Whether it was removed + */ + private boolean isRemoved(int i) { + return removed.get(i); + } + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationObjectUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationObjectUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationObjectUtils.java new file mode 100644 index 0000000..ae31bb2 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/ConfigurationObjectUtils.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.utils; + +import org.apache.giraph.writable.kryo.KryoWritableWrapper; +import org.apache.hadoop.conf.Configuration; + +/** + * Utility methods for dealing with Hadoop configuration + */ +public class ConfigurationObjectUtils { + /** Hide constructor */ + private ConfigurationObjectUtils() { + } + + /** + * Encode bytes to a hex String + * + * @param bytes byte[] + * @return encoded String + */ + public static String encodeBytes(byte[] bytes) { + StringBuilder strBuf = new StringBuilder(); + for (int i = 0; i < bytes.length; i++) { + strBuf.append((char) (((bytes[i] >> 4) & 0xF) + ('a'))); + strBuf.append((char) (((bytes[i]) & 0xF) + ('a'))); + } + return strBuf.toString(); + } + + /** + * Decode bytes from a hex String + * + * @param str String to decode + * @return decoded byte[] + */ + public static byte[] decodeBytes(String str) { + byte[] bytes = new byte[str.length() / 2]; + for (int i = 0; i < str.length(); i += 2) { + char c = str.charAt(i); + bytes[i / 2] = (byte) ((c - 'a') << 4); + c = str.charAt(i + 1); + bytes[i / 2] += c - 'a'; + } + return bytes; + } + + /** + * Set byte array to a conf option + * + * @param data Byte array + * @param confOption Conf option + * @param conf Configuration + */ + public static void setByteArray(byte[] data, String confOption, + Configuration conf) { + conf.set(confOption, encodeBytes(data)); + } + + /** + * Get byte array from a conf option + * + * @param confOption Conf option + * @param conf Configuration + * @return Byte array + */ + public static byte[] getByteArray(String confOption, + Configuration conf) { + return decodeBytes(conf.get(confOption)); + } + + /** + * Set object in a conf option using kryo + * + * @param object Object to set + * @param confOption Conf option + * @param conf Configuration + * @param <T> Type of the object + */ + public static <T> void setObjectKryo(T object, String confOption, + Configuration conf) { + setByteArray(WritableUtils.toByteArrayUnsafe( + new KryoWritableWrapper<>(object)), + confOption, conf); + } + + /** + * Get object from a conf option using kryo + * + * @param confOption Conf option + * @param conf Configuration + * @return Object from conf + * @param <T> Type of the object + */ + public static <T> T getObjectKryo(String confOption, + Configuration conf) { + KryoWritableWrapper<T> wrapper = new KryoWritableWrapper<>(); + WritableUtils.fromByteArrayUnsafe( + getByteArray(confOption, conf), wrapper, + new UnsafeReusableByteArrayInput()); + return wrapper.get(); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-core/src/main/java/org/apache/giraph/utils/DefaultOutputCommitter.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/DefaultOutputCommitter.java b/giraph-core/src/main/java/org/apache/giraph/utils/DefaultOutputCommitter.java new file mode 100644 index 0000000..ef273b0 --- /dev/null +++ b/giraph-core/src/main/java/org/apache/giraph/utils/DefaultOutputCommitter.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.utils; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Output committer which has abstract commit method + */ +public abstract class DefaultOutputCommitter extends OutputCommitter { + /** + * For cleaning up the job's output after job completion. Note that this + * is invoked for jobs with final run state as + * {@link org.apache.hadoop.mapreduce.JobStatus.State#SUCCEEDED} + * + * @param jobContext Context of the job whose output is being written. + */ + public abstract void commit(JobContext jobContext) throws IOException; + + @Override + public final void setupJob(JobContext jobContext) throws IOException { + } + + @Override + public final void setupTask(TaskAttemptContext taskContext) + throws IOException { + } + + @Override + public final void commitJob(JobContext jobContext) + throws IOException { + super.commitJob(jobContext); + commit(jobContext); + } + + @Override + public final boolean needsTaskCommit(TaskAttemptContext taskContext) + throws IOException { + // Digraph does not require a task commit and there is a bug in Corona + // which triggers t5688706 + // Avoiding the task commit should work around this. + return false; + } + + @Override + public final void commitTask(TaskAttemptContext context) throws IOException { + } + + @Override + public final void abortTask(TaskAttemptContext taskContext) + throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java index 68ed89a..2229c2f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java +++ b/giraph-core/src/main/java/org/apache/giraph/utils/WritableUtils.java @@ -34,6 +34,7 @@ import java.util.List; import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; import org.apache.giraph.edge.Edge; import org.apache.giraph.edge.OutEdges; +import org.apache.giraph.factories.ValueFactory; import org.apache.giraph.graph.Vertex; import org.apache.giraph.zk.ZooKeeperExt; import org.apache.giraph.zk.ZooKeeperExt.PathStat; @@ -950,4 +951,190 @@ public class WritableUtils { } return copy; } + + /** + * Create a copy of Writable object, by serializing and deserializing it. + * + * @param original Original value of which to make a copy + * @return Copy of the original value + * @param <T> Type of the object + */ + public static final <T extends Writable> T createCopy(T original) { + return (T) createCopy(original, original.getClass(), null); + } + + /** + * Create a copy of Writable object, by serializing and deserializing it. + * + * @param original Original value of which to make a copy + * @param outputClass Expected copy class, needs to match original + * @param conf Configuration + * @return Copy of the original value + * @param <T> Type of the object + */ + public static final <T extends Writable> + T createCopy(T original, Class<? extends T> outputClass, + ImmutableClassesGiraphConfiguration conf) { + T result = WritableUtils.createWritable(outputClass, conf); + copyInto(original, result); + return result; + } + + /** + * Create a copy of Writable object, by serializing and deserializing it. + * + * @param original Original value of which to make a copy + * @param classFactory Factory to create new empty object from + * @param conf Configuration + * @return Copy of the original value + * @param <T> Type of the object + */ + public static final <T extends Writable> + T createCopy(T original, ValueFactory<T> classFactory, + ImmutableClassesGiraphConfiguration conf) { + T result = classFactory.newInstance(); + copyInto(original, result); + return result; + } + + /** + * Serialize given writable object, and return it's size. + * + * @param w Writable object + * @return it's size after serialization + */ + public static int size(Writable w) { + try { + ExtendedByteArrayDataOutput out = new ExtendedByteArrayDataOutput(); + w.write(out); + return out.getPos(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Serialize given writable to byte array, + * using new instance of ExtendedByteArrayDataOutput. + * + * @param w Writable object + * @return array of bytes + * @param <T> Type of the object + */ + public static <T extends Writable> byte[] toByteArray(T w) { + try { + ExtendedByteArrayDataOutput out = new ExtendedByteArrayDataOutput(); + w.write(out); + return out.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Deserialize from given byte array into given writable, + * using new instance of ExtendedByteArrayDataInput. + * + * @param data Byte array representing writable + * @param to Object to fill + * @param <T> Type of the object + */ + public static <T extends Writable> void fromByteArray(byte[] data, T to) { + try { + ExtendedByteArrayDataInput in = + new ExtendedByteArrayDataInput(data, 0, data.length); + to.readFields(in); + + if (in.available() != 0) { + throw new RuntimeException( + "Serialization encountered issues, " + in.available() + + " bytes left to be read"); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Serialize given writable to byte array, + * using new instance of UnsafeByteArrayOutputStream. + * + * @param w Writable object + * @return array of bytes + * @param <T> Type of the object + */ + public static <T extends Writable> byte[] toByteArrayUnsafe(T w) { + try { + UnsafeByteArrayOutputStream out = new UnsafeByteArrayOutputStream(); + w.write(out); + return out.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Deserialize from given byte array into given writable, + * using given reusable UnsafeReusableByteArrayInput. + * + * @param data Byte array representing writable + * @param to Object to fill + * @param reusableInput Reusable input to use + * @param <T> Type of the object + */ + public static <T extends Writable> void fromByteArrayUnsafe( + byte[] data, T to, UnsafeReusableByteArrayInput reusableInput) { + try { + reusableInput.initialize(data, 0, data.length); + to.readFields(reusableInput); + + if (reusableInput.available() != 0) { + throw new RuntimeException( + "Serialization encountered issues, " + reusableInput.available() + + " bytes left to be read"); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * First write a boolean saying whether an object is not null, + * and if it's not write the object + * + * @param object Object to write + * @param out DataOutput to write to + * @param <T> Object type + */ + public static <T extends Writable> void writeIfNotNullAndObject(T object, + DataOutput out) throws IOException { + out.writeBoolean(object != null); + if (object != null) { + object.write(out); + } + } + + /** + * First read a boolean saying whether an object is not null, + * and if it's not read the object + * + * @param reusableObject Reuse this object instance + * @param objectClass Class of the object, to create if reusableObject is null + * @param in DataInput to read from + * @param <T> Object type + * @return Object, or null + */ + public static <T extends Writable> T readIfNotNullAndObject(T reusableObject, + Class<T> objectClass, DataInput in) throws IOException { + if (in.readBoolean()) { + if (reusableObject == null) { + reusableObject = ReflectionUtils.newInstance(objectClass); + } + reusableObject.readFields(in); + return reusableObject; + } else { + return null; + } + } + } http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/giraph-core/src/test/java/org/apache/giraph/edge/LongDiffNullArrayEdgesTest.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/test/java/org/apache/giraph/edge/LongDiffNullArrayEdgesTest.java b/giraph-core/src/test/java/org/apache/giraph/edge/LongDiffNullArrayEdgesTest.java new file mode 100644 index 0000000..e23f592 --- /dev/null +++ b/giraph-core/src/test/java/org/apache/giraph/edge/LongDiffNullArrayEdgesTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.giraph.edge; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.giraph.conf.GiraphConfiguration; +import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration; +import org.apache.giraph.utils.UnsafeByteArrayInputStream; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; +import org.junit.Assert; +import org.junit.Test; + +import com.google.common.collect.Lists; + + +public class LongDiffNullArrayEdgesTest { + private static Edge<LongWritable, NullWritable> createEdge(long id) { + return EdgeFactory.create(new LongWritable(id)); + } + + private static void assertEdges(LongDiffNullArrayEdges edges, long... expected) { + int index = 0; + for (Edge<LongWritable, NullWritable> edge : edges) { + Assert.assertEquals(expected[index], edge.getTargetVertexId().get()); + index++; + } + Assert.assertEquals(expected.length, index); + } + + @Test + public void testEdges() { + LongDiffNullArrayEdges edges = getEdges(); + + List<Edge<LongWritable, NullWritable>> initialEdges = Lists.newArrayList( + createEdge(1), createEdge(2), createEdge(4)); + + edges.initialize(initialEdges); + assertEdges(edges, 1, 2, 4); + + edges.add(EdgeFactory.createReusable(new LongWritable(3))); + assertEdges(edges, 1, 2, 3, 4); + + edges.remove(new LongWritable(2)); + assertEdges(edges, 1, 3, 4); + } + + @Test + public void testPositiveAndNegativeEdges() { + LongDiffNullArrayEdges edges = getEdges(); + + List<Edge<LongWritable, NullWritable>> initialEdges = Lists.newArrayList( + createEdge(1), createEdge(-2), createEdge(3), createEdge(-4)); + + edges.initialize(initialEdges); + assertEdges(edges, -4, -2, 1, 3); + + edges.add(EdgeFactory.createReusable(new LongWritable(5))); + assertEdges(edges, -4, -2, 1, 3, 5); + + edges.remove(new LongWritable(-2)); + assertEdges(edges, -4, 1, 3, 5); + } + + @Test + public void testMutateEdges() { + LongDiffNullArrayEdges edges = getEdges(); + + edges.initialize(); + + // Add 10 edges with id i, for i = 0..9 + for (int i = 0; i < 10; ++i) { + edges.add(createEdge(i)); + } + + // Use the mutable iterator to remove edges with even id + Iterator<MutableEdge<LongWritable, NullWritable>> edgeIt = + edges.mutableIterator(); + while (edgeIt.hasNext()) { + if (edgeIt.next().getTargetVertexId().get() % 2 == 0) { + edgeIt.remove(); + } + } + + // We should now have 5 edges + assertEquals(5, edges.size()); + // The edge ids should be all odd + for (Edge<LongWritable, NullWritable> edge : edges) { + assertEquals(1, edge.getTargetVertexId().get() % 2); + } + } + + @Test + public void testSerialization() throws IOException { + LongDiffNullArrayEdges edges = getEdges(); + + edges.initialize(); + + // Add 10 edges with id i, for i = 0..9 + for (int i = 0; i < 10; ++i) { + edges.add(createEdge(i)); + } + + edges.trim(); + + // Use the mutable iterator to remove edges with even id + Iterator<MutableEdge<LongWritable, NullWritable>> edgeIt = + edges.mutableIterator(); + while (edgeIt.hasNext()) { + if (edgeIt.next().getTargetVertexId().get() % 2 == 0) { + edgeIt.remove(); + } + } + + // We should now have 5 edges + assertEdges(edges, 1, 3, 5, 7, 9); + + ByteArrayOutputStream arrayStream = new ByteArrayOutputStream(); + DataOutputStream tempBuffer = new DataOutputStream(arrayStream); + + edges.write(tempBuffer); + + byte[] binary = arrayStream.toByteArray(); + + assertTrue("Serialized version should not be empty ", binary.length > 0); + + edges = getEdges(); + edges.readFields(new UnsafeByteArrayInputStream(binary)); + + assertEquals(5, edges.size()); + + for (Edge<LongWritable, NullWritable> edge : edges) { + assertEquals(1, edge.getTargetVertexId().get() % 2); + } + } + + @Test + public void testParallelEdges() { + LongDiffNullArrayEdges edges = getEdges(); + + List<Edge<LongWritable, NullWritable>> initialEdges = Lists.newArrayList( + createEdge(2), createEdge(2), createEdge(2)); + + edges.initialize(initialEdges); + assertEquals(3, edges.size()); + + edges.remove(new LongWritable(2)); + assertEquals(0, edges.size()); + + edges.add(EdgeFactory.create(new LongWritable(2))); + assertEquals(1, edges.size()); + + edges.trim(); + assertEquals(1, edges.size()); + } + + @Test + public void testEdgeValues() { + LongDiffNullArrayEdges edges = getEdges(); + Set<Long> testValues = new HashSet<Long>(); + testValues.add(0L); + testValues.add((long) Integer.MAX_VALUE); + testValues.add(Long.MAX_VALUE); + + // shouldn't be working with negative IDs + // testValues.add((long) Integer.MIN_VALUE); + // testValues.add(Long.MIN_VALUE); + + List<Edge<LongWritable, NullWritable>> initialEdges = + new ArrayList<Edge<LongWritable, NullWritable>>(); + for(Long id : testValues) { + initialEdges.add(createEdge(id)); + } + + edges.initialize(initialEdges); + edges.trim(); + + Iterator<MutableEdge<LongWritable, NullWritable>> edgeIt = + edges.mutableIterator(); + while (edgeIt.hasNext()) { + long value = edgeIt.next().getTargetVertexId().get(); + assertTrue("Unknown edge found " + value, testValues.remove(value)); + } + } + + private LongDiffNullArrayEdges getEdges() { + GiraphConfiguration gc = new GiraphConfiguration(); + ImmutableClassesGiraphConfiguration<LongWritable, Writable, NullWritable> conf = + new ImmutableClassesGiraphConfiguration<LongWritable, Writable, NullWritable>(gc); + LongDiffNullArrayEdges ret = new LongDiffNullArrayEdges(); + ret.setConf(new ImmutableClassesGiraphConfiguration<LongWritable, Writable, NullWritable>(conf)); + return ret; + } + + @Test + public void testAddedSmalerValues() { + LongDiffNullArrayEdges edges = getEdges(); + + List<Edge<LongWritable, NullWritable>> initialEdges = Lists.newArrayList( + createEdge(100)); + + edges.initialize(initialEdges); + + edges.trim(); + + for (int i=0; i<16; i++) { + edges.add(createEdge(i)); + } + + edges.trim(); + + assertEquals(17, edges.size()); + } + + @Test(expected=IllegalStateException.class) + public void testFailSafeOnPotentialOverflow() { + LongDiffNullArrayEdges edges = getEdges(); + + List<Edge<LongWritable, NullWritable>> initialEdges = Lists.newArrayList( + createEdge(5223372036854775807L), createEdge(-4223372036854775807L)); + edges.initialize(initialEdges); + } + + @Test + public void testAvoidOverflowWithZero() { + LongDiffNullArrayEdges edges = getEdges(); + + List<Edge<LongWritable, NullWritable>> initialEdges = Lists.newArrayList( + createEdge(5223372036854775807L), createEdge(-4223372036854775807L), createEdge(0)); + edges.initialize(initialEdges); + assertEdges(edges, -4223372036854775807L, 0, 5223372036854775807L); + } +} http://git-wip-us.apache.org/repos/asf/giraph/blob/819d6d38/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 63a9bae..4198f13 100644 --- a/pom.xml +++ b/pom.xml @@ -275,6 +275,8 @@ under the License. <!-- This lets modules skip unit tests. More details: GIRAPH-957 --> <surefire.skip>false</surefire.skip> + <checkstyle.config.path>${top.dir}/checkstyle.xml</checkstyle.config.path> + <dep.avro.version>1.7.6</dep.avro.version> <dep.accumulo.version>1.4.0</dep.accumulo.version> <dep.asm.version>3.2</dep.asm.version> @@ -541,7 +543,7 @@ under the License. <artifactId>maven-checkstyle-plugin</artifactId> <version>2.15</version> <configuration> - <configLocation>${top.dir}/checkstyle.xml</configLocation> + <configLocation>${checkstyle.config.path}</configLocation> <consoleOutput>true</consoleOutput> <enableRulesSummary>false</enableRulesSummary> <headerLocation>${top.dir}/license-header.txt</headerLocation> @@ -837,7 +839,7 @@ under the License. <artifactId>maven-checkstyle-plugin</artifactId> <version>2.10</version> <configuration> - <configLocation>${top.dir}/checkstyle.xml</configLocation> + <configLocation>${checkstyle.config.path}</configLocation> <consoleOutput>true</consoleOutput> <enableRulesSummary>false</enableRulesSummary> <headerLocation>${top.dir}/license-header.txt</headerLocation> @@ -1713,6 +1715,11 @@ under the License. </dependency> <dependency> <groupId>org.apache.giraph</groupId> + <artifactId>giraph-block-app</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.giraph</groupId> <artifactId>giraph-examples</artifactId> <version>${project.version}</version> </dependency> @@ -2276,6 +2283,7 @@ under the License. <modules> <module>giraph-core</module> + <module>giraph-block-app</module> <module>giraph-examples</module> </modules>
