Author: xuefu
Date: Fri Jun 12 01:58:31 2015
New Revision: 1685016

URL: http://svn.apache.org/r1685016
Log:
PIG-4595: Fix unit test failures about TestFRJoinNullValue in spark mode (Liyun 
via Xuefu)

Added:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
    pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java?rev=1685016&r1=1685015&r2=1685016&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/FRJoinConverter.java
 Fri Jun 12 01:58:31 2015
@@ -52,16 +52,16 @@ public class FRJoinConverter implements
                               POFRJoin poFRJoin) throws IOException {
 
         SparkUtil.assertPredecessorSizeGreaterThan(predecessors, poFRJoin, 1);
-        JavaPairRDD<Object, Tuple2<Tuple, Tuple>> joinedPairRDD;
+        JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>> joinedPairRDD;
         int lr_idx = 0;
-        // RDD<Tuple> -> RDD<Tuple2<Object, Tuple>> -> JavaPairRDD<Object, 
Tuple>
-        JavaPairRDD<Object, Tuple> pairRDD1 = 
getPairRDD(predecessors.get(lr_idx),
+        // RDD<Tuple> -> RDD<Tuple2<IndexedKey, Tuple>> -> 
JavaPairRDD<IndexedKey, Tuple>
+        JavaPairRDD<IndexedKey, Tuple> pairRDD1 = 
getPairRDD(predecessors.get(lr_idx),
                 poFRJoin, lr_idx);
         lr_idx ++;
         // RDD transformations to support multiple join inputs:
         //  join().mapPartitions().join().mapPartitions,...
         while (true) {
-            JavaPairRDD<Object, Tuple> pairRDD2 = 
getPairRDD(predecessors.get(lr_idx),
+            JavaPairRDD<IndexedKey, Tuple> pairRDD2 = 
getPairRDD(predecessors.get(lr_idx),
                     poFRJoin, lr_idx);
 
             joinedPairRDD = join(pairRDD1, pairRDD2, poFRJoin);
@@ -72,27 +72,27 @@ public class FRJoinConverter implements
             // (key, (tuple from table1, tuple from table2, tuple from 
table3,...)
             // We need to convert these to the form (key, (tuple)) to
             // prepare it for the next join, i.e.
-            // RDD<Tuple2<Object, Tuple2<Tuple, Tuple>>> ->
-            // RDD<Tuple2<Object, Tuple>> -> JavaPairRDD<Object, Tuple>
-            JavaRDD<Tuple2<Object, Tuple>> resultRDD = joinedPairRDD
+            // RDD<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> ->
+            // RDD<Tuple2<IndexedKey, Tuple> -> JavaPairRDD<IndexedKey, Tuple>
+            JavaRDD<Tuple2<IndexedKey, Tuple>> resultRDD = joinedPairRDD
                     .mapPartitions(new ToKeyValueFunction());
-            pairRDD1 = new JavaPairRDD<Object, Tuple>(
-                    resultRDD.rdd(), SparkUtil.getManifest(Object.class),
+            pairRDD1 = new JavaPairRDD<IndexedKey, Tuple>(
+                    resultRDD.rdd(), SparkUtil.getManifest(IndexedKey.class),
                     SparkUtil.getManifest(Tuple.class));
         }
 
         // map to get JavaRDD<Tuple> from join() output (which is
-        // JavaPairRDD<Object, Tuple2<Tuple, Tuple>>, i.e. tuples are 
separated)
-        // by ignoring the key (of type Object) and concatenating the values
+        // JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>>, i.e. tuples are 
separated)
+        // by ignoring the key (of type IndexedKey) and concatenating the 
values
         // (i.e. the tuples)
         JavaRDD<Tuple> result = joinedPairRDD.mapPartitions(new 
ToValueFunction());
 
         return result.rdd();
     }
 
-    private JavaPairRDD<Object, Tuple2<Tuple, Tuple>> join(
-            JavaPairRDD<Object, Tuple> pairRDD1,
-            JavaPairRDD<Object, Tuple> pairRDD2,
+    private JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>> join(
+            JavaPairRDD<IndexedKey, Tuple> pairRDD1,
+            JavaPairRDD<IndexedKey, Tuple> pairRDD2,
             POFRJoin pofrJoin) {
         if (pofrJoin.isLeftOuterJoin()) {
             return leftOuterJoin(pairRDD1, pairRDD2);
@@ -101,33 +101,33 @@ public class FRJoinConverter implements
         }
     }
 
-    private JavaPairRDD<Object, Tuple2<Tuple, Tuple>> leftOuterJoin(
-            JavaPairRDD<Object, Tuple> pairRDD1,
-            JavaPairRDD<Object, Tuple> pairRDD2) {
+    private JavaPairRDD<IndexedKey, Tuple2<Tuple, Tuple>> leftOuterJoin(
+            JavaPairRDD<IndexedKey, Tuple> pairRDD1,
+            JavaPairRDD<IndexedKey, Tuple> pairRDD2) {
 
         // leftouterjoin() returns RDD containing pairs of the form
         // (k, (v, optional(w)))
-        JavaPairRDD<Object, Tuple2<Tuple, Optional<Tuple>>> pairRDD =
+        JavaPairRDD<IndexedKey, Tuple2<Tuple, Optional<Tuple>>> pairRDD =
                 pairRDD1.leftOuterJoin(pairRDD2);
         return pairRDD.mapToPair(new AbsentToEmptyTupleFunction(
                 ((Tuple) pairRDD2.first()._2()).size()));
     }
 
-    private static JavaPairRDD<Object, Tuple> getPairRDD(RDD<Tuple> rdd,
+    private static JavaPairRDD<IndexedKey, Tuple> getPairRDD(RDD<Tuple> rdd,
                                                          POFRJoin poFRJoin,
                                                          int lr_idx) {
-        RDD<Tuple2<Object, Tuple>> keyValRdd = rdd.map(
+        RDD<Tuple2<IndexedKey, Tuple>> keyValRdd = rdd.map(
                 new ExtractKeyFunction(poFRJoin, lr_idx),
-                SparkUtil.<Object, Tuple>getTuple2Manifest());
-        JavaPairRDD<Object, Tuple> pairRDD = new JavaPairRDD<Object, Tuple>(
-                keyValRdd, SparkUtil.getManifest(Object.class),
+                SparkUtil.<IndexedKey, Tuple>getTuple2Manifest());
+        JavaPairRDD<IndexedKey, Tuple> pairRDD = new JavaPairRDD<IndexedKey, 
Tuple>(
+                keyValRdd, SparkUtil.getManifest(IndexedKey.class),
                 SparkUtil.getManifest(Tuple.class));
         return pairRDD;
     }
 
 
     private static class ExtractKeyFunction extends
-            AbstractFunction1<Tuple, Tuple2<Object, Tuple>> implements
+            AbstractFunction1<Tuple, Tuple2<IndexedKey, Tuple>> implements
             Serializable {
 
         private final POFRJoin poFRJoin;
@@ -139,7 +139,7 @@ public class FRJoinConverter implements
         }
 
         @Override
-        public Tuple2<Object, Tuple> apply(Tuple tuple) {
+        public Tuple2<IndexedKey, Tuple> apply(Tuple tuple) {
             poFRJoin.getLRs()[lr_index].attachInput(tuple);
 
             try {
@@ -156,9 +156,10 @@ public class FRJoinConverter implements
 
                 // If tuple is (AA, 5) and key index is $1, then
                 // lrOut is 0 5 (AA), so get(1) returns key
+                Object index = ((Tuple) lrOut.result).get(0);
                 Object key = ((Tuple) lrOut.result).get(1);
                 Tuple value = tuple;
-                Tuple2<Object, Tuple> tuple_KeyValue = new Tuple2<Object, 
Tuple>(key,
+                Tuple2<IndexedKey, Tuple> tuple_KeyValue = new 
Tuple2<IndexedKey, Tuple>(new IndexedKey((Byte) index, key),
                         value);
 
                 return tuple_KeyValue;
@@ -170,24 +171,24 @@ public class FRJoinConverter implements
     }
 
     private static class ToValueFunction
-            implements FlatMapFunction<Iterator<Tuple2<Object, Tuple2<Tuple, 
Tuple>>>, Tuple>,
+            implements FlatMapFunction<Iterator<Tuple2<IndexedKey, 
Tuple2<Tuple, Tuple>>>, Tuple>,
             Serializable {
 
         private class Tuple2TransformIterable implements Iterable<Tuple> {
 
-            Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> in;
+            Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> in;
 
             Tuple2TransformIterable(
-                    Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> input) {
+                    Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> input) {
                 in = input;
             }
 
             public Iterator<Tuple> iterator() {
-                return new IteratorTransform<Tuple2<Object, Tuple2<Tuple, 
Tuple>>, Tuple>(
+                return new IteratorTransform<Tuple2<IndexedKey, Tuple2<Tuple, 
Tuple>>, Tuple>(
                         in) {
                     @Override
                     protected Tuple transform(
-                            Tuple2<Object, Tuple2<Tuple, Tuple>> next) {
+                            Tuple2<IndexedKey, Tuple2<Tuple, Tuple>> next) {
                         try {
 
                             Tuple leftTuple = next._2()._1();
@@ -218,32 +219,32 @@ public class FRJoinConverter implements
 
         @Override
         public Iterable<Tuple> call(
-                Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> input) {
+                Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> input) {
             return new Tuple2TransformIterable(input);
         }
     }
 
         private static class ToKeyValueFunction
-                implements FlatMapFunction<Iterator<Tuple2<Object, 
Tuple2<Tuple, Tuple>>>,
-                Tuple2<Object, Tuple>>, Serializable {
+                implements FlatMapFunction<Iterator<Tuple2<IndexedKey, 
Tuple2<Tuple, Tuple>>>,
+                Tuple2<IndexedKey, Tuple>>, Serializable {
 
             private class Tuple2TransformIterable implements
-                    Iterable<Tuple2<Object, Tuple>> {
+                    Iterable<Tuple2<IndexedKey, Tuple>> {
 
-                Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> in;
+                Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> in;
 
                 Tuple2TransformIterable(
-                        Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> input) {
+                        Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> 
input) {
                     in = input;
                 }
 
-                public Iterator<Tuple2<Object, Tuple>> iterator() {
-                    return new IteratorTransform<Tuple2<Object, Tuple2<Tuple, 
Tuple>>,
-                            Tuple2<Object, Tuple> >(
+                public Iterator<Tuple2<IndexedKey, Tuple>> iterator() {
+                    return new IteratorTransform<Tuple2<IndexedKey, 
Tuple2<Tuple, Tuple>>,
+                            Tuple2<IndexedKey, Tuple>>(
                             in) {
                         @Override
-                        protected Tuple2<Object, Tuple> transform(
-                                Tuple2<Object, Tuple2<Tuple, Tuple>> next) {
+                        protected Tuple2<IndexedKey, Tuple> transform(
+                                Tuple2<IndexedKey, Tuple2<Tuple, Tuple>> next) 
{
                             try {
 
                                 Tuple leftTuple = next._2()._1();
@@ -260,7 +261,7 @@ public class FRJoinConverter implements
                                     value.set(i + leftTuple.size(),
                                             rightTuple.get(i));
 
-                                Tuple2<Object, Tuple> result = new 
Tuple2<Object, Tuple>(
+                                Tuple2<IndexedKey, Tuple> result = new 
Tuple2<IndexedKey, Tuple>(
                                         next._1(),
                                         value);
                                 return result;
@@ -275,15 +276,15 @@ public class FRJoinConverter implements
 
 
             @Override
-        public Iterable<Tuple2<Object, Tuple>> call(
-                Iterator<Tuple2<Object, Tuple2<Tuple, Tuple>>> input) {
+            public Iterable<Tuple2<IndexedKey, Tuple>> call(
+                    Iterator<Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>> input) {
             return new Tuple2TransformIterable(input);
         }
     }
 
     private static class AbsentToEmptyTupleFunction implements
-            PairFunction<Tuple2<Object, Tuple2<Tuple, Optional<Tuple>>>,
-                    Object, Tuple2<Tuple, Tuple>>, Serializable {
+            PairFunction<Tuple2<IndexedKey, Tuple2<Tuple, Optional<Tuple>>>,
+                    IndexedKey, Tuple2<Tuple, Tuple>>, Serializable {
 
         private int rightTupleSize;
 
@@ -294,20 +295,20 @@ public class FRJoinConverter implements
         // When w is absent in the input tuple (key, (v, optional(w))),
         // the output tuple will contain an empty tuple in it's place.
         @Override
-        public Tuple2<Object, Tuple2<Tuple, Tuple>> call(
-                Tuple2<Object, Tuple2<Tuple, Optional<Tuple>>> input) {
-            final Object key = input._1();
-            Tuple2<Object, Tuple2<Tuple, Tuple>> result;
+        public Tuple2<IndexedKey, Tuple2<Tuple, Tuple>> call(
+                Tuple2<IndexedKey, Tuple2<Tuple, Optional<Tuple>>> input) {
+            final IndexedKey key = input._1();
+            Tuple2<IndexedKey, Tuple2<Tuple, Tuple>> result;
             Tuple2<Tuple, Optional<Tuple>> inval = input._2();
             if (inval._2().isPresent()) {
-                result = new Tuple2<Object, Tuple2<Tuple, Tuple>>(
+                result = new Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>(
                         key,
                         new Tuple2<Tuple, Tuple>(
                                 inval._1(),
                                 inval._2().get())
                 );
             } else {
-                result = new Tuple2<Object, Tuple2<Tuple, Tuple>>(
+                result = new Tuple2<IndexedKey, Tuple2<Tuple, Tuple>>(
                         key,
                         new Tuple2<Tuple, Tuple>(
                                 inval._1(),

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java?rev=1685016&r1=1685015&r2=1685016&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/GlobalRearrangeConverter.java
 Fri Jun 12 01:58:31 2015
@@ -322,122 +322,6 @@ public class GlobalRearrangeConverter im
         }
     }
 
-    /**
-     * IndexedKey records the index and key info.
-     * This is used as key for JOINs. It addresses the case where key is
-     * either empty (or is a tuple with one or more empty fields). In this 
case,
-     * we must respect the SQL standard as documented in the equals() method.
-     */
-    public static class IndexedKey implements Serializable {
-        private byte index;
-        private Object key;
-
-        public IndexedKey(byte index, Object key) {
-            this.index = index;
-            this.key = key;
-        }
-
-        public Object getKey() {
-            return key;
-        }
-
-        @Override
-        public String toString() {
-            return "IndexedKey{" +
-                    "index=" + index +
-                    ", key=" + key +
-                    '}';
-        }
-
-        /**
-         * If key is empty, we'd like compute equality based on key and index.
-         * If key is not empty, we'd like to compute equality based on just 
the key (like we normally do).
-         * There are two possible cases when two tuples are compared:
-         * 1) Compare tuples of same table (same index)
-         * 2) Compare tuples of different tables (different index values)
-         * In 1)
-         * key1    key2    equal?
-         * null    null      Y
-         * foo     null      N
-         * null    foo       N
-         * foo     foo       Y
-         * (1,1)   (1,1)     Y
-         * (1,)    (1,)      Y
-         * (1,2)   (1,2)     Y
-
-         *
-         * In 2)
-         * key1    key2    equal?
-         * null    null     N
-         * foo     null     N
-         * null    foo      N
-         * foo     foo      Y
-         * (1,1)   (1,1)    Y
-         * (1,)    (1,)     N
-         * (1,2)   (1,2)    Y
-
-         *
-         * @param o
-         * @return
-         */
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-            IndexedKey that = (IndexedKey) o;
-            if (index == that.index) {
-                if (key == null && that.key == null) {
-                    return true;
-                } else if (key == null || that.key == null) {
-                    return false;
-                } else{
-                    return key.equals(that.key);
-                }
-            } else {
-                if (key == null || that.key == null) {
-                    return false;
-                } else if (key.equals(that.key) && !containNullfields(key)) {
-                    return true;
-                } else {
-                    return false;
-                }
-            }
-        }
-
-        private boolean containNullfields(Object key) {
-            if (key instanceof Tuple) {
-                for (int i = 0; i < ((Tuple) key).size(); i++) {
-                    try {
-                        if (((Tuple) key).get(i) == null) {
-                            return true;
-                        }
-                    } catch (ExecException e) {
-                        throw new RuntimeException("exception found in " +
-                                "containNullfields", e);
-
-                    }
-                }
-            }
-            return false;
-
-        }
-
-        /**
-         * Calculate hashCode by index and key
-         * if key is empty, return index value
-         * if key is not empty, return the key.hashCode()
-         */
-        @Override
-        public int hashCode() {
-            int result = 0;
-            if (key == null) {
-                result = (int) index;
-            }else {
-                result =  key.hashCode();
-            }
-            return result;
-        }
-    }
 
     /**
      * Converts incoming locally rearranged tuple, which is of the form

Added: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java?rev=1685016&view=auto
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
 (added)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/IndexedKey.java
 Fri Jun 12 01:58:31 2015
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.spark.converter;
+
+import java.io.Serializable;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+
+/**
+ * IndexedKey records the index and key info.
+ * This is used as key for JOINs. It addresses the case where key is
+ * either empty (or is a tuple with one or more empty fields). In this case,
+ * we must respect the SQL standard as documented in the equals() method.
+ */
+public class IndexedKey implements Serializable {
+    private byte index;
+    private Object key;
+
+    public IndexedKey(byte index, Object key) {
+        this.index = index;
+        this.key = key;
+    }
+
+    public Object getKey() {
+        return key;
+    }
+
+    @Override
+    public String toString() {
+        return "IndexedKey{" +
+                "index=" + index +
+                ", key=" + key +
+                '}';
+    }
+
+    /**
+     * If key is empty, we'd like compute equality based on key and index.
+     * If key is not empty, we'd like to compute equality based on just the 
key (like we normally do).
+     * There are two possible cases when two tuples are compared:
+     * 1) Compare tuples of same table (same index)
+     * 2) Compare tuples of different tables (different index values)
+     * In 1)
+     * key1    key2    equal?
+     * null    null      Y
+     * foo     null      N
+     * null    foo       N
+     * foo     foo       Y
+     * (1,1)   (1,1)     Y
+     * (1,)    (1,)      Y
+     * (1,2)   (1,2)     Y
+     * <p/>
+     * <p/>
+     * In 2)
+     * key1    key2    equal?
+     * null    null     N
+     * foo     null     N
+     * null    foo      N
+     * foo     foo      Y
+     * (1,1)   (1,1)    Y
+     * (1,)    (1,)     N
+     * (1,2)   (1,2)    Y
+     *
+     * @param o
+     * @return
+     */
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        IndexedKey that = (IndexedKey) o;
+        if (index == that.index) {
+            if (key == null && that.key == null) {
+                return true;
+            } else if (key == null || that.key == null) {
+                return false;
+            } else {
+                return key.equals(that.key);
+            }
+        } else {
+            if (key == null || that.key == null) {
+                return false;
+            } else if (key.equals(that.key) && !containNullfields(key)) {
+                return true;
+            } else {
+                return false;
+            }
+        }
+    }
+
+    private boolean containNullfields(Object key) {
+        if (key instanceof Tuple) {
+            for (int i = 0; i < ((Tuple) key).size(); i++) {
+                try {
+                    if (((Tuple) key).get(i) == null) {
+                        return true;
+                    }
+                } catch (ExecException e) {
+                    throw new RuntimeException("exception found in " +
+                            "containNullfields", e);
+
+                }
+            }
+        }
+        return false;
+
+    }
+
+    /**
+     * Calculate hashCode by index and key
+     * if key is empty, return index value
+     * if key is not empty, return the key.hashCode()
+     */
+    @Override
+    public int hashCode() {
+        int result = 0;
+        if (key == null) {
+            result = (int) index;
+        } else {
+            result = key.hashCode();
+        }
+        return result;
+    }
+}
\ No newline at end of file

Modified: pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java?rev=1685016&r1=1685015&r2=1685016&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java (original)
+++ pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java Fri Jun 12 
01:58:31 2015
@@ -16,8 +16,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
-import org.apache.pig.backend.hadoop.executionengine.spark.converter
-        .GlobalRearrangeConverter;
+import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.IndexedKey;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import static org.junit.Assert.assertEquals;
@@ -37,23 +36,23 @@ public class TestIndexedKey {
      */
     @Test
     public void testIndexedKeyWithSameIndexValue() throws Exception {
-        GlobalRearrangeConverter.IndexedKey a0 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"),"foo");
-        GlobalRearrangeConverter.IndexedKey a1 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"),null);
+        IndexedKey a0 = new IndexedKey(new Byte("0"), "foo");
+        IndexedKey a1 = new IndexedKey(new Byte("0"), null);
         assertEquals(a0.equals(a1), false);
         assertEquals(a0.hashCode()==a1.hashCode(),false);
 
-        GlobalRearrangeConverter.IndexedKey a2 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"),null);
-        GlobalRearrangeConverter.IndexedKey a3 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"),"foo");
+        IndexedKey a2 = new IndexedKey(new Byte("0"), null);
+        IndexedKey a3 = new IndexedKey(new Byte("0"), "foo");
         assertEquals(a2.equals(a3),false);
         assertEquals(a2.hashCode()==a3.hashCode(),false);
 
-        GlobalRearrangeConverter.IndexedKey a4 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"),"foo");
-        GlobalRearrangeConverter.IndexedKey a5 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"),"foo");
+        IndexedKey a4 = new IndexedKey(new Byte("0"), "foo");
+        IndexedKey a5 = new IndexedKey(new Byte("0"), "foo");
         assertEquals(a4.equals(a5),true);
         assertEquals(a4.hashCode()==a5.hashCode(),true);
 
-        GlobalRearrangeConverter.IndexedKey a6 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"),null);
-        GlobalRearrangeConverter.IndexedKey a7 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"),null);
+        IndexedKey a6 = new IndexedKey(new Byte("0"), null);
+        IndexedKey a7 = new IndexedKey(new Byte("0"), null);
         assertEquals(a6.equals(a7),true);
         assertEquals(a6.hashCode()==a7.hashCode(),true);
 
@@ -63,8 +62,8 @@ public class TestIndexedKey {
         Tuple t2 = TupleFactory.getInstance().newTuple(2);
         t2.set(0,"1");
         t2.set(1,"1");
-        GlobalRearrangeConverter.IndexedKey a8 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"),t1);
-        GlobalRearrangeConverter.IndexedKey a9 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"),t2);
+        IndexedKey a8 = new IndexedKey(new Byte("0"), t1);
+        IndexedKey a9 = new IndexedKey(new Byte("0"), t2);
         assertEquals(a8.equals(a9),true);
         assertEquals(a8.hashCode()==a9.hashCode(),true);
 
@@ -74,8 +73,8 @@ public class TestIndexedKey {
         Tuple t4 = TupleFactory.getInstance().newTuple(2);
         t4.set(0,"1");
         t4.set(1,null);
-        GlobalRearrangeConverter.IndexedKey a10 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"),t3);
-        GlobalRearrangeConverter.IndexedKey a11 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"),t4);
+        IndexedKey a10 = new IndexedKey(new Byte("0"), t3);
+        IndexedKey a11 = new IndexedKey(new Byte("0"), t4);
         assertEquals(a10.equals(a11),true);
         assertEquals(a10.hashCode()==a11.hashCode(),true);
 
@@ -85,8 +84,8 @@ public class TestIndexedKey {
         Tuple t6 = TupleFactory.getInstance().newTuple(2);
         t6.set(0,"1");
         t6.set(1,"2");
-        GlobalRearrangeConverter.IndexedKey a12 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"),t5);
-        GlobalRearrangeConverter.IndexedKey a13 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"),t6);
+        IndexedKey a12 = new IndexedKey(new Byte("0"), t5);
+        IndexedKey a13 = new IndexedKey(new Byte("0"), t6);
         assertEquals(a12.equals(a13),false);
         assertEquals(a12.hashCode()==a13.hashCode(),false);
     }
@@ -104,23 +103,23 @@ public class TestIndexedKey {
      */
     @Test
     public void testIndexedKeyWithDifferentIndexValue() throws Exception {
-        GlobalRearrangeConverter.IndexedKey a0 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"), "foo");
-        GlobalRearrangeConverter.IndexedKey a1 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("1"), null);
+        IndexedKey a0 = new IndexedKey(new Byte("0"), "foo");
+        IndexedKey a1 = new IndexedKey(new Byte("1"), null);
         assertEquals(a0.equals(a1), false);
         assertEquals(a0.hashCode() == a1.hashCode(), false);
 
-        GlobalRearrangeConverter.IndexedKey a2 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"), null);
-        GlobalRearrangeConverter.IndexedKey a3 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("1"), "foo");
+        IndexedKey a2 = new IndexedKey(new Byte("0"), null);
+        IndexedKey a3 = new IndexedKey(new Byte("1"), "foo");
         assertEquals(a2.equals(a3), false);
         assertEquals(a2.hashCode() == a3.hashCode(), false);
 
-        GlobalRearrangeConverter.IndexedKey a4 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"), "foo");
-        GlobalRearrangeConverter.IndexedKey a5 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("1"), "foo");
+        IndexedKey a4 = new IndexedKey(new Byte("0"), "foo");
+        IndexedKey a5 = new IndexedKey(new Byte("1"), "foo");
         assertEquals(a4.equals(a5), true);
         assertEquals(a4.hashCode() == a5.hashCode(), true);
 
-        GlobalRearrangeConverter.IndexedKey a6 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"), null);
-        GlobalRearrangeConverter.IndexedKey a7 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("1"), null);
+        IndexedKey a6 = new IndexedKey(new Byte("0"), null);
+        IndexedKey a7 = new IndexedKey(new Byte("1"), null);
         assertEquals(a6.equals(a7), false);
         assertEquals(a6.hashCode() == a7.hashCode(), false);
 
@@ -130,8 +129,8 @@ public class TestIndexedKey {
         Tuple t2 = TupleFactory.getInstance().newTuple(2);
         t2.set(0, "1");
         t2.set(1, "1");
-        GlobalRearrangeConverter.IndexedKey a8 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"), t1);
-        GlobalRearrangeConverter.IndexedKey a9 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("1"), t2);
+        IndexedKey a8 = new IndexedKey(new Byte("0"), t1);
+        IndexedKey a9 = new IndexedKey(new Byte("1"), t2);
         assertEquals(a8.equals(a9), true);
         assertEquals(a8.hashCode() == a9.hashCode(), true);
 
@@ -141,8 +140,8 @@ public class TestIndexedKey {
         Tuple t4 = TupleFactory.getInstance().newTuple(2);
         t4.set(0, "1");
         t4.set(1, null);
-        GlobalRearrangeConverter.IndexedKey a10 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"), t3);
-        GlobalRearrangeConverter.IndexedKey a11 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("1"), t4);
+        IndexedKey a10 = new IndexedKey(new Byte("0"), t3);
+        IndexedKey a11 = new IndexedKey(new Byte("1"), t4);
         assertEquals(a10.equals(a11), false);
         assertEquals(a10.hashCode() == a11.hashCode(), true); //hashcode of 
a10 and a11 are equal but they are not equal
 
@@ -152,8 +151,8 @@ public class TestIndexedKey {
         Tuple t6 = TupleFactory.getInstance().newTuple(2);
         t6.set(0, "1");
         t6.set(1, "2");
-        GlobalRearrangeConverter.IndexedKey a12 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("0"), t5);
-        GlobalRearrangeConverter.IndexedKey a13 = new 
GlobalRearrangeConverter.IndexedKey(new Byte("1"), t6);
+        IndexedKey a12 = new IndexedKey(new Byte("0"), t5);
+        IndexedKey a13 = new IndexedKey(new Byte("1"), t6);
         assertEquals(a12.equals(a13), false);
         assertEquals(a12.hashCode() == a13.hashCode(), false);
     }


Reply via email to