Repository: asterixdb
Updated Branches:
  refs/heads/master 7d75792f1 -> 0dec33a96


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0dec33a9/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index 6154e28..a87dc1e 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -19,12 +19,14 @@
 package org.apache.hyracks.tests.integration;
 
 import java.io.File;
+import java.util.Arrays;
 
 import org.apache.hyracks.api.constraints.PartitionConstraintHelper;
 import org.apache.hyracks.api.dataflow.IConnectorDescriptor;
 import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
+import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
 import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
 import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
 import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -32,8 +34,10 @@ import org.apache.hyracks.api.io.FileSplit;
 import org.apache.hyracks.api.io.ManagedFileSplit;
 import org.apache.hyracks.api.job.JobSpecification;
 import org.apache.hyracks.api.result.ResultSetId;
+import 
org.apache.hyracks.data.std.accessors.MurmurHash3BinaryHashFunctionFamily;
 import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory;
 import 
org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory;
+import 
org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily;
 import org.apache.hyracks.data.std.primitive.UTF8StringPointable;
 import 
org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer;
 import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory;
@@ -46,9 +50,12 @@ import 
org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider;
 import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory;
 import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
-import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory;
+import 
org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor;
+import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor;
 import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor;
 import org.apache.hyracks.tests.util.NoopMissingWriterFactory;
 import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider;
@@ -68,6 +75,40 @@ public class TPCHCustomerOrderHashJoinTest extends 
AbstractIntegrationTest {
      * NULL, O_COMMENT VARCHAR(79) NOT NULL );
      */
 
+    private static boolean DEBUG = false;
+
+    static RecordDescriptor custDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
+            new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
+            new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
+
+    static RecordDescriptor ordersDesc =
+            new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
+
+    static RecordDescriptor custOrderJoinDesc =
+            new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
+                    new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
+
+    static IValueParserFactory[] custValueParserFactories = new 
IValueParserFactory[custDesc.getFieldCount()];
+    static IValueParserFactory[] orderValueParserFactories = new 
IValueParserFactory[ordersDesc.getFieldCount()];
+
+    static {
+        Arrays.fill(custValueParserFactories, 
UTF8StringParserFactory.INSTANCE);
+        Arrays.fill(orderValueParserFactories, 
UTF8StringParserFactory.INSTANCE);
+    }
+
     @Test
     public void customerOrderCIDJoin() throws Exception {
         JobSpecification spec = new JobSpecification();
@@ -75,50 +116,17 @@ public class TPCHCustomerOrderHashJoinTest extends 
AbstractIntegrationTest {
         FileSplit[] custSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID, "data" + File.separator + 
"tpch0.001" + File.separator + "customer.tbl") };
         IFileSplitProvider custSplitsProvider = new 
ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC2_ID, "data" + File.separator + 
"tpch0.001" + File.separator + "orders.tbl") };
         IFileSplitProvider ordersSplitsProvider = new 
ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new 
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] 
{ UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, 
'|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
ordScanner, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new 
IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new 
FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, 
'|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
custScanner, NC1_ID);
 
         InMemoryHashJoinOperatorDescriptor join = new 
InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
@@ -155,57 +163,28 @@ public class TPCHCustomerOrderHashJoinTest extends 
AbstractIntegrationTest {
         FileSplit[] custSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID, "data" + File.separator + 
"tpch0.001" + File.separator + "customer.tbl") };
         IFileSplitProvider custSplitsProvider = new 
ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC2_ID, "data" + File.separator + 
"tpch0.001" + File.separator + "orders.tbl") };
         IFileSplitProvider ordersSplitsProvider = new 
ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new 
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] 
{ UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, 
'|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
ordScanner, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new 
IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new 
FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, 
'|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
custScanner, NC1_ID);
 
-        HybridHashJoinOperatorDescriptor join = new 
HybridHashJoinOperatorDescriptor(spec, 32, 20, 200, 1.2,
-                new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { 
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+        OptimizedHybridHashJoinOperatorDescriptor join = new 
OptimizedHybridHashJoinOperatorDescriptor(spec, 32, 20,
+                1.2, new int[] { 1 }, new int[] { 0 },
+                new IBinaryHashFunctionFamily[] { 
MurmurHash3BinaryHashFunctionFamily.INSTANCE },
                 new IBinaryComparatorFactory[] { 
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, null, false, null);
+                custOrderJoinDesc,
+                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 1, 0),
+                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 0, 1), null,
+                false, null);
+
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, 
NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -235,50 +214,17 @@ public class TPCHCustomerOrderHashJoinTest extends 
AbstractIntegrationTest {
         FileSplit[] custSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID, "data" + File.separator + 
"tpch0.001" + File.separator + "customer.tbl") };
         IFileSplitProvider custSplitsProvider = new 
ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC2_ID, "data" + File.separator + 
"tpch0.001" + File.separator + "orders.tbl") };
         IFileSplitProvider ordersSplitsProvider = new 
ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new 
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] 
{ UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, 
'|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
ordScanner, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new 
IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new 
FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, 
'|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
custScanner, NC1_ID);
 
         IMissingWriterFactory[] nonMatchWriterFactories = new 
IMissingWriterFactory[ordersDesc.getFieldCount()];
@@ -320,50 +266,17 @@ public class TPCHCustomerOrderHashJoinTest extends 
AbstractIntegrationTest {
         FileSplit[] custSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID, "data" + File.separator + 
"tpch0.001" + File.separator + "customer.tbl") };
         IFileSplitProvider custSplitsProvider = new 
ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC2_ID, "data" + File.separator + 
"tpch0.001" + File.separator + "orders.tbl") };
         IFileSplitProvider ordersSplitsProvider = new 
ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new 
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] 
{ UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, 
'|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
ordScanner, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new 
IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new 
FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, 
'|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
custScanner, NC1_ID);
 
         IMissingWriterFactory[] nonMatchWriterFactories = new 
IMissingWriterFactory[ordersDesc.getFieldCount()];
@@ -371,11 +284,14 @@ public class TPCHCustomerOrderHashJoinTest extends 
AbstractIntegrationTest {
             nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE;
         }
 
-        HybridHashJoinOperatorDescriptor join = new 
HybridHashJoinOperatorDescriptor(spec, 32, 20, 200, 1.2,
-                new int[] { 0 }, new int[] { 1 },
-                new IBinaryHashFunctionFactory[] { 
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+        OptimizedHybridHashJoinOperatorDescriptor join = new 
OptimizedHybridHashJoinOperatorDescriptor(spec, 32, 20,
+                1.2, new int[] { 0 }, new int[] { 1 },
+                new IBinaryHashFunctionFamily[] { 
MurmurHash3BinaryHashFunctionFamily.INSTANCE },
                 new IBinaryComparatorFactory[] { 
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, null, true, nonMatchWriterFactories);
+                custOrderJoinDesc,
+                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 0, 1),
+                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 1, 0), null,
+                true, nonMatchWriterFactories);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, 
NC1_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -408,11 +324,6 @@ public class TPCHCustomerOrderHashJoinTest extends 
AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator 
+ "customer-part2.tbl") };
         IFileSplitProvider custSplitsProvider = new 
ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID,
@@ -420,41 +331,13 @@ public class TPCHCustomerOrderHashJoinTest extends 
AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator 
+ "orders-part2.tbl") };
         IFileSplitProvider ordersSplitsProvider = new 
ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new 
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] 
{ UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, 
'|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
ordScanner, NC1_ID, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new 
IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new 
FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, 
'|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
custScanner, NC1_ID, NC2_ID);
 
         InMemoryHashJoinOperatorDescriptor join = new 
InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
@@ -498,11 +381,6 @@ public class TPCHCustomerOrderHashJoinTest extends 
AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator 
+ "customer-part2.tbl") };
         IFileSplitProvider custSplitsProvider = new 
ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID,
@@ -510,48 +388,24 @@ public class TPCHCustomerOrderHashJoinTest extends 
AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator 
+ "orders-part2.tbl") };
         IFileSplitProvider ordersSplitsProvider = new 
ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new 
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] 
{ UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, 
'|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
ordScanner, NC1_ID, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new 
IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new 
FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, 
'|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
custScanner, NC1_ID, NC2_ID);
 
-        HybridHashJoinOperatorDescriptor join = new 
HybridHashJoinOperatorDescriptor(spec, 5, 20, 100, 1.2,
+        OptimizedHybridHashJoinOperatorDescriptor join = new 
OptimizedHybridHashJoinOperatorDescriptor(spec, 5, 20, 1.2,
                 new int[] { 1 }, new int[] { 0 },
-                new IBinaryHashFunctionFactory[] { 
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
+                new IBinaryHashFunctionFamily[] { 
MurmurHash3BinaryHashFunctionFamily.INSTANCE },
                 new IBinaryComparatorFactory[] { 
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
-                custOrderJoinDesc, null, false, null);
+                custOrderJoinDesc,
+                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 1, 0),
+                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 0, 1), null,
+                false, null);
+
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, 
NC1_ID, NC2_ID);
 
         ResultSetId rsId = new ResultSetId(1);
@@ -588,11 +442,6 @@ public class TPCHCustomerOrderHashJoinTest extends 
AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator 
+ "customer-part2.tbl") };
         IFileSplitProvider custSplitsProvider = new 
ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID,
@@ -600,41 +449,13 @@ public class TPCHCustomerOrderHashJoinTest extends 
AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator 
+ "orders-part2.tbl") };
         IFileSplitProvider ordersSplitsProvider = new 
ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new 
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] 
{ UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, 
'|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
ordScanner, NC1_ID, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new 
IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new 
FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, 
'|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
custScanner, NC1_ID, NC2_ID);
 
         InMemoryHashJoinOperatorDescriptor join = new 
InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 },
@@ -678,11 +499,6 @@ public class TPCHCustomerOrderHashJoinTest extends 
AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator 
+ "customer-part2.tbl") };
         IFileSplitProvider custSplitsProvider = new 
ConstantFileSplitProvider(custSplits);
-        RecordDescriptor custDesc = new RecordDescriptor(new 
ISerializerDeserializer[] {
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileSplit[] ordersSplits = new FileSplit[] {
                 new ManagedFileSplit(NC1_ID,
@@ -690,41 +506,13 @@ public class TPCHCustomerOrderHashJoinTest extends 
AbstractIntegrationTest {
                 new ManagedFileSplit(NC2_ID,
                         "data" + File.separator + "tpch0.001" + File.separator 
+ "orders-part2.tbl") };
         IFileSplitProvider ordersSplitsProvider = new 
ConstantFileSplitProvider(ordersSplits);
-        RecordDescriptor ordersDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
-
-        RecordDescriptor custOrderJoinDesc =
-                new RecordDescriptor(new ISerializerDeserializer[] { new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer(),
-                        new UTF8StringSerializerDeserializer(), new 
UTF8StringSerializerDeserializer() });
 
         FileScanOperatorDescriptor ordScanner = new 
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
-                new DelimitedDataTupleParserFactory(new IValueParserFactory[] 
{ UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                        UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                ordersDesc);
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, 
'|'), ordersDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
ordScanner, NC1_ID, NC2_ID);
 
-        FileScanOperatorDescriptor custScanner =
-                new FileScanOperatorDescriptor(spec, custSplitsProvider,
-                        new DelimitedDataTupleParserFactory(new 
IValueParserFactory[] {
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE,
-                                UTF8StringParserFactory.INSTANCE, 
UTF8StringParserFactory.INSTANCE }, '|'),
-                        custDesc);
+        FileScanOperatorDescriptor custScanner = new 
FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, 
'|'), custDesc);
         PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
custScanner, NC1_ID, NC2_ID);
 
         MaterializingOperatorDescriptor ordMat = new 
MaterializingOperatorDescriptor(spec, ordersDesc);
@@ -769,4 +557,161 @@ public class TPCHCustomerOrderHashJoinTest extends 
AbstractIntegrationTest {
         spec.addRoot(printer);
         runTest(spec);
     }
+
+    @Test
+    public void customerOrderCIDHybridHashJoin_Case1() throws Exception {
+        JobSpecification spec = new JobSpecification();
+        FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID,
+                "data" + File.separator + "tpch0.001" + File.separator + 
"customer4.tbl") };
+        IFileSplitProvider custSplitsProvider = new 
ConstantFileSplitProvider(custSplits);
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new ManagedFileSplit(NC2_ID, "data" + File.separator + 
"tpch0.001" + File.separator + "orders4.tbl") };
+
+        IFileSplitProvider ordersSplitsProvider = new 
ConstantFileSplitProvider(ordersSplits);
+        FileScanOperatorDescriptor ordScanner = new 
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, 
'|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
ordScanner, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new 
FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, 
'|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
custScanner, NC1_ID);
+
+        OptimizedHybridHashJoinOperatorDescriptor join = new 
OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 243,
+                1.2, new int[] { 0 }, new int[] { 1 },
+                new IBinaryHashFunctionFamily[] { 
UTF8StringBinaryHashFunctionFamily.INSTANCE },
+                new IBinaryComparatorFactory[] { 
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc,
+                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 0, 1),
+                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 1, 0),
+                null);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, 
NC1_ID);
+
+        String path = getClass().getName() + File.separator + "case1";
+        IOperatorDescriptor printer = getPrinter(spec, path);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
+
+        IConnectorDescriptor custJoinConn = new 
OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IConnectorDescriptor ordJoinConn = new 
MToNBroadcastConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new 
OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+        System.out.println("output to " + path);
+    }
+
+    @Test
+    public void customerOrderCIDHybridHashJoin_Case2() throws Exception {
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID,
+                "data" + File.separator + "tpch0.001" + File.separator + 
"customer3.tbl") };
+        IFileSplitProvider custSplitsProvider = new 
ConstantFileSplitProvider(custSplits);
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new ManagedFileSplit(NC2_ID, "data" + File.separator + 
"tpch0.001" + File.separator + "orders4.tbl") };
+
+        IFileSplitProvider ordersSplitsProvider = new 
ConstantFileSplitProvider(ordersSplits);
+
+        FileScanOperatorDescriptor ordScanner = new 
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, 
'|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
ordScanner, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new 
FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, 
'|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
custScanner, NC1_ID);
+
+        OptimizedHybridHashJoinOperatorDescriptor join = new 
OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 122,
+                1.2, new int[] { 0 }, new int[] { 1 },
+                new IBinaryHashFunctionFamily[] { 
UTF8StringBinaryHashFunctionFamily.INSTANCE },
+                new IBinaryComparatorFactory[] { 
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc,
+                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 0, 1),
+                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 1, 0),
+                null);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, 
NC1_ID);
+
+        String path = getClass().getName() + File.separator + "case2";
+        IOperatorDescriptor printer = getPrinter(spec, path);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
+
+        IConnectorDescriptor custJoinConn = new 
OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IConnectorDescriptor ordJoinConn = new 
MToNBroadcastConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new 
OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+        System.out.println("output to " + path);
+    }
+
+    @Test
+    public void customerOrderCIDHybridHashJoin_Case3() throws Exception {
+
+        JobSpecification spec = new JobSpecification();
+
+        FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID,
+                "data" + File.separator + "tpch0.001" + File.separator + 
"customer3.tbl") };
+        IFileSplitProvider custSplitsProvider = new 
ConstantFileSplitProvider(custSplits);
+
+        FileSplit[] ordersSplits = new FileSplit[] {
+                new ManagedFileSplit(NC2_ID, "data" + File.separator + 
"tpch0.001" + File.separator + "orders1.tbl") };
+
+        IFileSplitProvider ordersSplitsProvider = new 
ConstantFileSplitProvider(ordersSplits);
+
+        FileScanOperatorDescriptor ordScanner = new 
FileScanOperatorDescriptor(spec, ordersSplitsProvider,
+                new DelimitedDataTupleParserFactory(orderValueParserFactories, 
'|'), ordersDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
ordScanner, NC2_ID);
+
+        FileScanOperatorDescriptor custScanner = new 
FileScanOperatorDescriptor(spec, custSplitsProvider,
+                new DelimitedDataTupleParserFactory(custValueParserFactories, 
'|'), custDesc);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, 
custScanner, NC1_ID);
+
+        OptimizedHybridHashJoinOperatorDescriptor join = new 
OptimizedHybridHashJoinOperatorDescriptor(spec, 6, 122,
+                1.2, new int[] { 0 }, new int[] { 1 },
+                new IBinaryHashFunctionFamily[] { 
UTF8StringBinaryHashFunctionFamily.INSTANCE },
+                new IBinaryComparatorFactory[] { 
PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) },
+                custOrderJoinDesc,
+                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 0, 1),
+                new 
JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY),
 1, 0),
+                null);
+
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, 
NC1_ID);
+
+        String path = getClass().getName() + File.separator + "case3";
+        IOperatorDescriptor printer = getPrinter(spec, path);
+        PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, 
NC1_ID);
+
+        IConnectorDescriptor custJoinConn = new 
OneToOneConnectorDescriptor(spec);
+        spec.connect(custJoinConn, custScanner, 0, join, 0);
+
+        IConnectorDescriptor ordJoinConn = new 
MToNBroadcastConnectorDescriptor(spec);
+        spec.connect(ordJoinConn, ordScanner, 0, join, 1);
+
+        IConnectorDescriptor joinPrinterConn = new 
OneToOneConnectorDescriptor(spec);
+        spec.connect(joinPrinterConn, join, 0, printer, 0);
+
+        spec.addRoot(printer);
+        runTest(spec);
+        System.out.println("output to " + path);
+    }
+
+    private IOperatorDescriptor getPrinter(JobSpecification spec, String path) 
{
+        IFileSplitProvider outputSplitProvider =
+                new ConstantFileSplitProvider(new FileSplit[] { new 
ManagedFileSplit(NC1_ID, path) });
+
+        return DEBUG ? new PlainFileWriterOperatorDescriptor(spec, 
outputSplitProvider, "|")
+                : new NullSinkOperatorDescriptor(spec);
+    }
 }

Reply via email to