Author: rohini
Date: Tue Aug 11 23:03:16 2015
New Revision: 1695398

URL: http://svn.apache.org/r1695398
Log:
PIG-4651: Optimize NullablePartitionWritable serialization for skewed join 
(rohini)

Added:
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1695398&r1=1695397&r2=1695398&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Aug 11 23:03:16 2015
@@ -38,6 +38,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4651: Optimize NullablePartitionWritable serialization for skewed join 
(rohini)
+
 PIG-4627: [Pig on Tez] Self join does not handle null values correctly (rohini)
 
 PIG-4644: PORelationToExprProject.clone() is broken (erwaman via rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java?rev=1695398&r1=1695397&r2=1695398&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/HDataType.java Tue Aug 11 
23:03:16 2015
@@ -86,6 +86,48 @@ public class HDataType {
         }
     }
 
+    public static PigNullableWritable getNewWritableComparable(byte keyType) 
throws ExecException {
+        switch (keyType) {
+            case DataType.BAG:
+                return new NullableBag();
+            case DataType.BOOLEAN:
+                return new NullableBooleanWritable();
+            case DataType.BYTEARRAY:
+                return new NullableBytesWritable();
+            case DataType.CHARARRAY:
+                return new NullableText();
+            case DataType.DOUBLE:
+                return new NullableDoubleWritable();
+            case DataType.FLOAT:
+                return new NullableFloatWritable();
+            case DataType.INTEGER:
+                return new NullableIntWritable();
+            case DataType.BIGINTEGER:
+                return new NullableBigIntegerWritable();
+            case DataType.BIGDECIMAL:
+                return new NullableBigDecimalWritable();
+            case DataType.LONG:
+                return new NullableLongWritable();
+            case DataType.DATETIME:
+                return new NullableDateTimeWritable();
+            case DataType.TUPLE:
+                return new NullableTuple();
+            case DataType.MAP: {
+                int errCode = 1068;
+                String msg = "Using Map as key not supported.";
+                throw new ExecException(msg, errCode, PigException.INPUT);
+            }
+            default: {
+                if (typeToName == null) typeToName = 
DataType.genTypeToNameMap();
+                int errCode = 2044;
+                String msg = "The type "
+                    + typeToName.get(keyType) == null ? "" + keyType : 
typeToName.get(keyType)
+                    + " cannot be collected as a Key type";
+                throw new ExecException(msg, errCode, PigException.BUG);
+            }
+        }
+    }
+
     public static PigNullableWritable getWritableComparableTypes(Object o, 
byte keyType) throws ExecException{
 
         byte newKeyType = keyType;
@@ -261,6 +303,14 @@ public class HDataType {
         return wcKey;
     }
 
+    public static byte findTypeFromClassName(String className) throws 
ExecException {
+        if (classToTypeMap.containsKey(className)) {
+            return classToTypeMap.get(className);
+        } else {
+            throw new ExecException("Unable to map " + className + " to known 
types." + Arrays.toString(classToTypeMap.keySet().toArray()));
+        }
+    }
+
     public static byte findTypeFromNullableWritable(PigNullableWritable o) 
throws ExecException {
         if (o instanceof NullableBooleanWritable)
             return DataType.BOOLEAN;

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java?rev=1695398&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigWritableComparators.java
 Tue Aug 11 23:03:16 2015
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.pig.impl.io.NullablePartitionWritable;
+
+public class PigWritableComparators {
+
+    //
+    // Raw Comparators for Skewed Join
+    //
+    public static class PigBooleanRawPartitionComparator extends 
PigBooleanRawComparator {
+
+        public PigBooleanRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            // Skip the first byte which is the type of the key
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigIntRawPartitionComparator extends 
PigIntRawComparator {
+
+        public PigIntRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigBigIntegerRawPartitionComparator extends 
PigBigIntegerRawComparator {
+
+        public PigBigIntegerRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigBigDecimalRawPartitionComparator extends 
PigBigDecimalRawComparator {
+
+        public PigBigDecimalRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigLongRawPartitionComparator extends 
PigLongRawComparator {
+
+        public PigLongRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigFloatRawPartitionComparator extends 
PigFloatRawComparator {
+
+        public PigFloatRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigDoubleRawPartitionComparator extends 
PigDoubleRawComparator {
+
+        public PigDoubleRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigDateTimeRawPartitionComparator extends 
PigDateTimeRawComparator {
+
+        public PigDateTimeRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigTextRawPartitionComparator extends 
PigTextRawComparator {
+
+        public PigTextRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigBytesRawPartitionComparator extends 
PigBytesRawComparator {
+
+        public PigBytesRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigTupleSortPartitionComparator extends 
PigTupleSortComparator {
+
+        public PigTupleSortPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+}
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.pig.impl.io.NullablePartitionWritable;
+
+public class PigWritableComparators {
+
+    //
+    // Raw Comparators for Skewed Join
+    //
+    public static class PigBooleanRawPartitionComparator extends 
PigBooleanRawComparator {
+
+        public PigBooleanRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            // Skip the first byte which is the type of the key
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigIntRawPartitionComparator extends 
PigIntRawComparator {
+
+        public PigIntRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigBigIntegerRawPartitionComparator extends 
PigBigIntegerRawComparator {
+
+        public PigBigIntegerRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigBigDecimalRawPartitionComparator extends 
PigBigDecimalRawComparator {
+
+        public PigBigDecimalRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigLongRawPartitionComparator extends 
PigLongRawComparator {
+
+        public PigLongRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigFloatRawPartitionComparator extends 
PigFloatRawComparator {
+
+        public PigFloatRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigDoubleRawPartitionComparator extends 
PigDoubleRawComparator {
+
+        public PigDoubleRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigDateTimeRawPartitionComparator extends 
PigDateTimeRawComparator {
+
+        public PigDateTimeRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigTextRawPartitionComparator extends 
PigTextRawComparator {
+
+        public PigTextRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigBytesRawPartitionComparator extends 
PigBytesRawComparator {
+
+        public PigBytesRawPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+    public static class PigTupleSortPartitionComparator extends 
PigTupleSortComparator {
+
+        public PigTupleSortPartitionComparator() {
+            super();
+        }
+
+        @Override
+        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int 
l2) {
+            return super.compare(b1, s1 + 1, l1, b2, s2 + 1, l2);
+        }
+
+        @Override
+        public int compare(Object o1, Object o2) {
+            return super.compare(((NullablePartitionWritable)o1).getKey(), 
((NullablePartitionWritable)o2).getKey());
+        }
+    }
+
+}

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1695398&r1=1695397&r2=1695398&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Tue Aug 11 23:03:16 2015
@@ -58,7 +58,6 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingPartitionWritableComparator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
@@ -77,6 +76,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSecondaryKeyComparator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextRawComparator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -1035,21 +1035,76 @@ public class TezDagBuilder extends TezOp
         }
     }
 
+    private static Class<? extends WritableComparator> 
getRawComparatorForSkewedJoin(byte keyType)
+            throws JobCreationException {
+
+        // Extended Raw Comparators for SkewedJoin which unwrap the 
NullablePartitionWritable
+        switch (keyType) {
+        case DataType.BOOLEAN:
+            return 
PigWritableComparators.PigBooleanRawPartitionComparator.class;
+
+        case DataType.INTEGER:
+            return PigWritableComparators.PigIntRawPartitionComparator.class;
+
+        case DataType.BIGINTEGER:
+            return 
PigWritableComparators.PigBigIntegerRawPartitionComparator.class;
+
+        case DataType.BIGDECIMAL:
+            return 
PigWritableComparators.PigBigDecimalRawPartitionComparator.class;
+
+        case DataType.LONG:
+            return PigWritableComparators.PigLongRawPartitionComparator.class;
+
+        case DataType.FLOAT:
+            return PigWritableComparators.PigFloatRawPartitionComparator.class;
+
+        case DataType.DOUBLE:
+            return 
PigWritableComparators.PigDoubleRawPartitionComparator.class;
+
+        case DataType.DATETIME:
+            return 
PigWritableComparators.PigDateTimeRawPartitionComparator.class;
+
+        case DataType.CHARARRAY:
+            return PigWritableComparators.PigTextRawPartitionComparator.class;
+
+        case DataType.BYTEARRAY:
+            return PigWritableComparators.PigBytesRawPartitionComparator.class;
+
+        case DataType.MAP:
+            int errCode = 1068;
+            String msg = "Using Map as key not supported.";
+            throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+        case DataType.TUPLE:
+            return 
PigWritableComparators.PigTupleSortPartitionComparator.class;
+
+        case DataType.BAG:
+            errCode = 1068;
+            msg = "Using Bag as key not supported.";
+            throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+        default:
+            errCode = 2036;
+            msg = "Unhandled key type " + DataType.findTypeName(keyType);
+            throw new JobCreationException(msg, errCode, PigException.BUG);
+        }
+    }
+
     void selectOutputComparator(byte keyType, Configuration conf, TezOperator 
tezOp)
             throws JobCreationException {
         // TODO: Handle sorting like in JobControlCompiler
         // TODO: Group comparators as in JobControlCompiler
-        if (tezOp != null && tezOp.isUseSecondaryKey()) {
+        if (tezOp == null) {
+            return;
+        }
+        if (tezOp.isUseSecondaryKey()) {
             conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
                     PigSecondaryKeyComparator.class.getName());
             setGroupingComparator(conf, 
PigSecondaryKeyGroupComparator.class.getName());
         } else {
-            if (tezOp != null && tezOp.isSkewedJoin()) {
-                // TODO: PigGroupingPartitionWritableComparator only used as 
Group comparator in MR.
-                // What should be TEZ_RUNTIME_KEY_COMPARATOR_CLASS if same as 
MR?
-                
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
-                        
PigGroupingPartitionWritableComparator.class.getName());
-                setGroupingComparator(conf, 
PigGroupingPartitionWritableComparator.class.getName());
+            if (tezOp.isSkewedJoin()) {
+                
conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+                        getRawComparatorForSkewedJoin(keyType), 
RawComparator.class);
             } else {
                 conf.setClass(
                         
TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,

Modified: pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java?rev=1695398&r1=1695397&r2=1695398&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java 
(original)
+++ pig/trunk/src/org/apache/pig/impl/io/NullablePartitionWritable.java Tue Aug 
11 23:03:16 2015
@@ -70,9 +70,9 @@ public class NullablePartitionWritable e
 
        @Override
     public void readFields(DataInput in) throws IOException {
-               String c = in.readUTF();
+               byte type = in.readByte();
                try {
-                       key = HDataType.getWritableComparable(c);
+                       key = HDataType.getNewWritableComparable(type);
                } catch(Exception e) {
                        throw new IOException(e);
                }
@@ -81,7 +81,7 @@ public class NullablePartitionWritable e
 
        @Override
     public void write(DataOutput out) throws IOException {
-               out.writeUTF(key.getClass().getName());
+               
out.writeByte(HDataType.findTypeFromClassName(key.getClass().getName()));
                key.write(out);
        }
 


Reply via email to