Repository: asterixdb Updated Branches: refs/heads/master 7d75792f1 -> 0dec33a96
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0dec33a9/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java index 6154e28..a87dc1e 100644 --- a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java +++ b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java @@ -19,12 +19,14 @@ package org.apache.hyracks.tests.integration; import java.io.File; +import java.util.Arrays; import org.apache.hyracks.api.constraints.PartitionConstraintHelper; import org.apache.hyracks.api.dataflow.IConnectorDescriptor; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; +import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily; import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; @@ -32,8 +34,10 @@ import org.apache.hyracks.api.io.FileSplit; import org.apache.hyracks.api.io.ManagedFileSplit; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.api.result.ResultSetId; +import org.apache.hyracks.data.std.accessors.MurmurHash3BinaryHashFunctionFamily; import org.apache.hyracks.data.std.accessors.PointableBinaryComparatorFactory; import org.apache.hyracks.data.std.accessors.PointableBinaryHashFunctionFactory; +import org.apache.hyracks.data.std.accessors.UTF8StringBinaryHashFunctionFamily; import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.dataflow.common.data.marshalling.UTF8StringSerializerDeserializer; import org.apache.hyracks.dataflow.common.data.parsers.IValueParserFactory; @@ -46,9 +50,12 @@ import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider; import org.apache.hyracks.dataflow.std.file.DelimitedDataTupleParserFactory; import org.apache.hyracks.dataflow.std.file.FileScanOperatorDescriptor; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; -import org.apache.hyracks.dataflow.std.join.HybridHashJoinOperatorDescriptor; +import org.apache.hyracks.dataflow.std.file.PlainFileWriterOperatorDescriptor; import org.apache.hyracks.dataflow.std.join.InMemoryHashJoinOperatorDescriptor; +import org.apache.hyracks.dataflow.std.join.JoinComparatorFactory; +import org.apache.hyracks.dataflow.std.join.OptimizedHybridHashJoinOperatorDescriptor; import org.apache.hyracks.dataflow.std.misc.MaterializingOperatorDescriptor; +import org.apache.hyracks.dataflow.std.misc.NullSinkOperatorDescriptor; import org.apache.hyracks.dataflow.std.result.ResultWriterOperatorDescriptor; import org.apache.hyracks.tests.util.NoopMissingWriterFactory; import org.apache.hyracks.tests.util.ResultSerializerFactoryProvider; @@ -68,6 +75,40 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { * NULL, O_COMMENT VARCHAR(79) NOT NULL ); */ + private static boolean DEBUG = false; + + static RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); + + static RecordDescriptor ordersDesc = + new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); + + static RecordDescriptor custOrderJoinDesc = + new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), + new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); + + static IValueParserFactory[] custValueParserFactories = new IValueParserFactory[custDesc.getFieldCount()]; + static IValueParserFactory[] orderValueParserFactories = new IValueParserFactory[ordersDesc.getFieldCount()]; + + static { + Arrays.fill(custValueParserFactories, UTF8StringParserFactory.INSTANCE); + Arrays.fill(orderValueParserFactories, UTF8StringParserFactory.INSTANCE); + } + @Test public void customerOrderCIDJoin() throws Exception { JobSpecification spec = new JobSpecification(); @@ -75,50 +116,17 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 }, @@ -155,57 +163,28 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); - HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 32, 20, 200, 1.2, - new int[] { 1 }, new int[] { 0 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, + OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 32, 20, + 1.2, new int[] { 1 }, new int[] { 0 }, + new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, null, false, null); + custOrderJoinDesc, + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), null, + false, null); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); ResultSetId rsId = new ResultSetId(1); @@ -235,50 +214,17 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()]; @@ -320,50 +266,17 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); IMissingWriterFactory[] nonMatchWriterFactories = new IMissingWriterFactory[ordersDesc.getFieldCount()]; @@ -371,11 +284,14 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { nonMatchWriterFactories[j] = NoopMissingWriterFactory.INSTANCE; } - HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 32, 20, 200, 1.2, - new int[] { 0 }, new int[] { 1 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, + OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 32, 20, + 1.2, new int[] { 0 }, new int[] { 1 }, + new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, null, true, nonMatchWriterFactories); + custOrderJoinDesc, + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), null, + true, nonMatchWriterFactories); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); ResultSetId rsId = new ResultSetId(1); @@ -408,11 +324,6 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer-part2.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, @@ -420,41 +331,13 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID); InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 }, @@ -498,11 +381,6 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer-part2.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, @@ -510,48 +388,24 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID); - HybridHashJoinOperatorDescriptor join = new HybridHashJoinOperatorDescriptor(spec, 5, 20, 100, 1.2, + OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 5, 20, 1.2, new int[] { 1 }, new int[] { 0 }, - new IBinaryHashFunctionFactory[] { PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) }, + new IBinaryHashFunctionFamily[] { MurmurHash3BinaryHashFunctionFamily.INSTANCE }, new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, - custOrderJoinDesc, null, false, null); + custOrderJoinDesc, + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), null, + false, null); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID, NC2_ID); ResultSetId rsId = new ResultSetId(1); @@ -588,11 +442,6 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer-part2.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, @@ -600,41 +449,13 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID); InMemoryHashJoinOperatorDescriptor join = new InMemoryHashJoinOperatorDescriptor(spec, new int[] { 1 }, @@ -678,11 +499,6 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "customer-part2.tbl") }; IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); - RecordDescriptor custDesc = new RecordDescriptor(new ISerializerDeserializer[] { - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileSplit[] ordersSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, @@ -690,41 +506,13 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders-part2.tbl") }; IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); - RecordDescriptor ordersDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); - - RecordDescriptor custOrderJoinDesc = - new RecordDescriptor(new ISerializerDeserializer[] { new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer(), - new UTF8StringSerializerDeserializer(), new UTF8StringSerializerDeserializer() }); FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - ordersDesc); + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC1_ID, NC2_ID); - FileScanOperatorDescriptor custScanner = - new FileScanOperatorDescriptor(spec, custSplitsProvider, - new DelimitedDataTupleParserFactory(new IValueParserFactory[] { - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE, - UTF8StringParserFactory.INSTANCE, UTF8StringParserFactory.INSTANCE }, '|'), - custDesc); + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID, NC2_ID); MaterializingOperatorDescriptor ordMat = new MaterializingOperatorDescriptor(spec, ordersDesc); @@ -769,4 +557,161 @@ public class TPCHCustomerOrderHashJoinTest extends AbstractIntegrationTest { spec.addRoot(printer); runTest(spec); } + + @Test + public void customerOrderCIDHybridHashJoin_Case1() throws Exception { + JobSpecification spec = new JobSpecification(); + FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, + "data" + File.separator + "tpch0.001" + File.separator + "customer4.tbl") }; + IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); + + FileSplit[] ordersSplits = new FileSplit[] { + new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders4.tbl") }; + + IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); + FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); + + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); + + OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 243, + 1.2, new int[] { 0 }, new int[] { 1 }, + new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }, + new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, + custOrderJoinDesc, + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), + null); + + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); + + String path = getClass().getName() + File.separator + "case1"; + IOperatorDescriptor printer = getPrinter(spec, path); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); + + IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec); + spec.connect(custJoinConn, custScanner, 0, join, 0); + + IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec); + spec.connect(ordJoinConn, ordScanner, 0, join, 1); + + IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec); + spec.connect(joinPrinterConn, join, 0, printer, 0); + + spec.addRoot(printer); + runTest(spec); + System.out.println("output to " + path); + } + + @Test + public void customerOrderCIDHybridHashJoin_Case2() throws Exception { + JobSpecification spec = new JobSpecification(); + + FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, + "data" + File.separator + "tpch0.001" + File.separator + "customer3.tbl") }; + IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); + + FileSplit[] ordersSplits = new FileSplit[] { + new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders4.tbl") }; + + IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); + + FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); + + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); + + OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 15, 122, + 1.2, new int[] { 0 }, new int[] { 1 }, + new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }, + new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, + custOrderJoinDesc, + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), + null); + + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); + + String path = getClass().getName() + File.separator + "case2"; + IOperatorDescriptor printer = getPrinter(spec, path); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); + + IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec); + spec.connect(custJoinConn, custScanner, 0, join, 0); + + IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec); + spec.connect(ordJoinConn, ordScanner, 0, join, 1); + + IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec); + spec.connect(joinPrinterConn, join, 0, printer, 0); + + spec.addRoot(printer); + runTest(spec); + System.out.println("output to " + path); + } + + @Test + public void customerOrderCIDHybridHashJoin_Case3() throws Exception { + + JobSpecification spec = new JobSpecification(); + + FileSplit[] custSplits = new FileSplit[] { new ManagedFileSplit(NC1_ID, + "data" + File.separator + "tpch0.001" + File.separator + "customer3.tbl") }; + IFileSplitProvider custSplitsProvider = new ConstantFileSplitProvider(custSplits); + + FileSplit[] ordersSplits = new FileSplit[] { + new ManagedFileSplit(NC2_ID, "data" + File.separator + "tpch0.001" + File.separator + "orders1.tbl") }; + + IFileSplitProvider ordersSplitsProvider = new ConstantFileSplitProvider(ordersSplits); + + FileScanOperatorDescriptor ordScanner = new FileScanOperatorDescriptor(spec, ordersSplitsProvider, + new DelimitedDataTupleParserFactory(orderValueParserFactories, '|'), ordersDesc); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, ordScanner, NC2_ID); + + FileScanOperatorDescriptor custScanner = new FileScanOperatorDescriptor(spec, custSplitsProvider, + new DelimitedDataTupleParserFactory(custValueParserFactories, '|'), custDesc); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, custScanner, NC1_ID); + + OptimizedHybridHashJoinOperatorDescriptor join = new OptimizedHybridHashJoinOperatorDescriptor(spec, 6, 122, + 1.2, new int[] { 0 }, new int[] { 1 }, + new IBinaryHashFunctionFamily[] { UTF8StringBinaryHashFunctionFamily.INSTANCE }, + new IBinaryComparatorFactory[] { PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY) }, + custOrderJoinDesc, + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 0, 1), + new JoinComparatorFactory(PointableBinaryComparatorFactory.of(UTF8StringPointable.FACTORY), 1, 0), + null); + + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join, NC1_ID); + + String path = getClass().getName() + File.separator + "case3"; + IOperatorDescriptor printer = getPrinter(spec, path); + PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, printer, NC1_ID); + + IConnectorDescriptor custJoinConn = new OneToOneConnectorDescriptor(spec); + spec.connect(custJoinConn, custScanner, 0, join, 0); + + IConnectorDescriptor ordJoinConn = new MToNBroadcastConnectorDescriptor(spec); + spec.connect(ordJoinConn, ordScanner, 0, join, 1); + + IConnectorDescriptor joinPrinterConn = new OneToOneConnectorDescriptor(spec); + spec.connect(joinPrinterConn, join, 0, printer, 0); + + spec.addRoot(printer); + runTest(spec); + System.out.println("output to " + path); + } + + private IOperatorDescriptor getPrinter(JobSpecification spec, String path) { + IFileSplitProvider outputSplitProvider = + new ConstantFileSplitProvider(new FileSplit[] { new ManagedFileSplit(NC1_ID, path) }); + + return DEBUG ? new PlainFileWriterOperatorDescriptor(spec, outputSplitProvider, "|") + : new NullSinkOperatorDescriptor(spec); + } }
