Repository: giraph
Updated Branches:
  refs/heads/trunk daa8f92f7 -> f183402ea


fixing large sets

Summary:
Sets with a large number of elements (>800000000) are not supported by 
IntOpenHashSet.
Chaning it to IntOpenHashBigSet.

ArrayLists have the same problem, but we'll postpone the fix untill we have a 
use case

https://issues.apache.org/jira/browse/GIRAPH-1028

Test Plan:
mvn clean install
see also a new test TestCollections (that needs 32G to run)

Reviewers: sergey.edunov, maja.kabiljo, ikabiljo

Reviewed By: ikabiljo

Differential Revision: https://reviews.facebook.net/D44859


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/f183402e
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/f183402e
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/f183402e

Branch: refs/heads/trunk
Commit: f183402ea3aa777fd7fe822eb794837a8e0b3478
Parents: daa8f92
Author: spupyrev <[email protected]>
Authored: Tue Sep 8 16:05:29 2015 -0700
Committer: Igor Kabiljo <[email protected]>
Committed: Thu Sep 10 13:37:34 2015 -0700

----------------------------------------------------------------------
 .../org/apache/giraph/types/ops/IntTypeOps.java |  2 +-
 .../apache/giraph/types/ops/LongTypeOps.java    |  2 +-
 .../giraph/types/ops/PrimitiveIdTypeOps.java    |  2 +-
 .../giraph/types/ops/collections/BasicSet.java  | 83 ++++++++++++++------
 .../apache/giraph/types/TestCollections.java    | 63 +++++++++++++++
 5 files changed, 127 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/f183402e/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
index d159834..5ab9631 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/IntTypeOps.java
@@ -68,7 +68,7 @@ public enum IntTypeOps
   }
 
   @Override
-  public BasicSet<IntWritable> createOpenHashSet(int capacity) {
+  public BasicSet<IntWritable> createOpenHashSet(long capacity) {
     return new BasicIntOpenHashSet(capacity);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f183402e/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
index 741a7a9..b588aed 100644
--- a/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
+++ b/giraph-core/src/main/java/org/apache/giraph/types/ops/LongTypeOps.java
@@ -68,7 +68,7 @@ public enum LongTypeOps
   }
 
   @Override
-  public BasicSet<LongWritable> createOpenHashSet(int capacity) {
+  public BasicSet<LongWritable> createOpenHashSet(long capacity) {
     return new BasicLongOpenHashSet(capacity);
   }
 

http://git-wip-us.apache.org/repos/asf/giraph/blob/f183402e/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java
index b157885..95d894b 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/PrimitiveIdTypeOps.java
@@ -48,7 +48,7 @@ public interface PrimitiveIdTypeOps<T> extends 
PrimitiveTypeOps<T> {
    * @param capacity Capacity
    * @return BasicSet
    */
-  BasicSet<T> createOpenHashSet(int capacity);
+  BasicSet<T> createOpenHashSet(long capacity);
 
   /**
    * Create Basic2ObjectMap with key type T.

http://git-wip-us.apache.org/repos/asf/giraph/blob/f183402e/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicSet.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicSet.java
 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicSet.java
index 65a614e..9194169 100644
--- 
a/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicSet.java
+++ 
b/giraph-core/src/main/java/org/apache/giraph/types/ops/collections/BasicSet.java
@@ -18,9 +18,13 @@
 package org.apache.giraph.types.ops.collections;
 
 import it.unimi.dsi.fastutil.ints.IntIterator;
+import it.unimi.dsi.fastutil.ints.IntOpenHashBigSet;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
 import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.longs.LongOpenHashBigSet;
 import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -41,19 +45,27 @@ import org.apache.hadoop.io.Writable;
  * @param <T> Element type
  */
 public interface BasicSet<T> extends Writable {
+  /** Threshold for using OpenHashSet and OpenHashBigSet implementations. */
+  long MAX_OPEN_HASHSET_CAPACITY = 800000000;
+
   /** Removes all of the elements from this list. */
   void clear();
+
   /**
    * Number of elements in this list
+   *
    * @return size
    */
-  int size();
+  long size();
+
   /**
    * Makes sure set is not using space with capacity more than
    * max(n,size()) entries.
+   *
    * @param n the threshold for the trimming.
    */
-  void trim(int n);
+  void trim(long n);
+
   /**
    * Adds value to the set.
    * Returns <tt>true</tt> if set changed as a
@@ -63,8 +75,10 @@ public interface BasicSet<T> extends Writable {
    * @return true if set was changed.
    */
   boolean add(T value);
+
   /**
    * Checks whether set contains given value
+   *
    * @param value Value to check
    * @return true if value is present in the set
    */
@@ -72,15 +86,16 @@ public interface BasicSet<T> extends Writable {
 
   /**
    * TypeOps for type of elements this object holds
+   *
    * @return TypeOps
    */
   PrimitiveIdTypeOps<T> getElementTypeOps();
 
   /** IntWritable implementation of BasicSet */
   public static final class BasicIntOpenHashSet
-      implements BasicSet<IntWritable> {
+    implements BasicSet<IntWritable> {
     /** Set */
-    private final IntOpenHashSet set;
+    private final IntSet set;
 
     /** Constructor */
     public BasicIntOpenHashSet() {
@@ -89,10 +104,15 @@ public interface BasicSet<T> extends Writable {
 
     /**
      * Constructor
+     *
      * @param capacity Capacity
      */
-    public BasicIntOpenHashSet(int capacity) {
-      set = new IntOpenHashSet(capacity);
+    public BasicIntOpenHashSet(long capacity) {
+      if (capacity <= MAX_OPEN_HASHSET_CAPACITY) {
+        set = new IntOpenHashSet((int) capacity);
+      } else {
+        set = new IntOpenHashBigSet(capacity);
+      }
     }
 
     @Override
@@ -101,13 +121,20 @@ public interface BasicSet<T> extends Writable {
     }
 
     @Override
-    public int size() {
+    public long size() {
+      if (set instanceof IntOpenHashBigSet) {
+        return ((IntOpenHashBigSet) set).size64();
+      }
       return set.size();
     }
 
     @Override
-    public void trim(int n) {
-      set.trim(Math.max(set.size(), n));
+    public void trim(long n) {
+      if (set instanceof IntOpenHashSet) {
+        ((IntOpenHashSet) set).trim((int) Math.max(set.size(), n));
+      } else {
+        ((IntOpenHashBigSet) set).trim(Math.max(set.size(), n));
+      }
     }
 
     @Override
@@ -136,10 +163,10 @@ public interface BasicSet<T> extends Writable {
 
     @Override
     public void readFields(DataInput in) throws IOException {
-      int size = in.readInt();
+      long size = in.readLong();
       set.clear();
-      set.trim(size);
-      for (int i = 0; i < size; ++i) {
+      trim(size);
+      for (long i = 0; i < size; ++i) {
         set.add(in.readInt());
       }
     }
@@ -147,9 +174,9 @@ public interface BasicSet<T> extends Writable {
 
   /** LongWritable implementation of BasicSet */
   public static final class BasicLongOpenHashSet
-      implements BasicSet<LongWritable> {
+    implements BasicSet<LongWritable> {
     /** Set */
-    private final LongOpenHashSet set;
+    private final LongSet set;
 
     /** Constructor */
     public BasicLongOpenHashSet() {
@@ -158,10 +185,15 @@ public interface BasicSet<T> extends Writable {
 
     /**
      * Constructor
+     *
      * @param capacity Capacity
      */
-    public BasicLongOpenHashSet(int capacity) {
-      set = new LongOpenHashSet(capacity);
+    public BasicLongOpenHashSet(long capacity) {
+      if (capacity <= MAX_OPEN_HASHSET_CAPACITY) {
+        set = new LongOpenHashSet((int) capacity);
+      } else {
+        set = new LongOpenHashBigSet(capacity);
+      }
     }
 
     @Override
@@ -170,13 +202,20 @@ public interface BasicSet<T> extends Writable {
     }
 
     @Override
-    public int size() {
+    public long size() {
+      if (set instanceof LongOpenHashBigSet) {
+        return ((LongOpenHashBigSet) set).size64();
+      }
       return set.size();
     }
 
     @Override
-    public void trim(int n) {
-      set.trim(Math.max(set.size(), n));
+    public void trim(long n) {
+      if (set instanceof LongOpenHashSet) {
+        ((LongOpenHashSet) set).trim((int) Math.max(set.size(), n));
+      } else {
+        ((LongOpenHashBigSet) set).trim(Math.max(set.size(), n));
+      }
     }
 
     @Override
@@ -196,7 +235,7 @@ public interface BasicSet<T> extends Writable {
 
     @Override
     public void write(DataOutput out) throws IOException {
-      out.writeInt(set.size());
+      out.writeLong(set.size());
       LongIterator iter = set.iterator();
       while (iter.hasNext()) {
         out.writeLong(iter.nextLong());
@@ -205,10 +244,10 @@ public interface BasicSet<T> extends Writable {
 
     @Override
     public void readFields(DataInput in) throws IOException {
-      int size = in.readInt();
+      long size = in.readLong();
       set.clear();
       trim(size);
-      for (int i = 0; i < size; ++i) {
+      for (long i = 0; i < size; ++i) {
         set.add(in.readLong());
       }
     }

http://git-wip-us.apache.org/repos/asf/giraph/blob/f183402e/giraph-core/src/test/java/org/apache/giraph/types/TestCollections.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/test/java/org/apache/giraph/types/TestCollections.java 
b/giraph-core/src/test/java/org/apache/giraph/types/TestCollections.java
new file mode 100644
index 0000000..2443147
--- /dev/null
+++ b/giraph-core/src/test/java/org/apache/giraph/types/TestCollections.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.types;
+
+import org.apache.giraph.types.ops.LongTypeOps;
+import org.apache.giraph.types.ops.collections.BasicArrayList;
+import org.apache.giraph.types.ops.collections.BasicSet;
+import org.apache.hadoop.io.LongWritable;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Test basic collections
+ */
+public class TestCollections {
+  @Test
+  public void testBasicSet() {
+    BasicSet<LongWritable> longSet = LongTypeOps.INSTANCE.createOpenHashSet();
+    long count = 13;
+    for (long i = 1, j = 0; j < count; i *= 10, j++) {
+      longSet.add(new LongWritable(i));
+    }
+    Assert.assertEquals(count, longSet.size());
+
+    longSet.clear();
+    Assert.assertEquals(0, longSet.size());
+  }
+
+  @Test
+  @Ignore("this test requires 32G to run")
+  public void testLargeBasicSet() {
+    long capacity = 1234567890;
+    BasicSet<LongWritable> longSet = 
LongTypeOps.INSTANCE.createOpenHashSet(capacity);
+    longSet.add(new LongWritable(capacity));
+    longSet.add(new LongWritable(capacity));
+    Assert.assertEquals(1, longSet.size());
+  }
+
+  @Test
+  public void testLargeBasicList() {
+    int capacity = 123456789;
+    BasicArrayList<LongWritable> longSet = 
LongTypeOps.INSTANCE.createArrayList(capacity);
+    longSet.add(new LongWritable(capacity));
+    longSet.add(new LongWritable(capacity));
+    Assert.assertEquals(2, longSet.size());
+  }
+}

Reply via email to