This is an automated email from the ASF dual-hosted git repository.
dlych pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 64786ba [ASTERIXDB-3004][RT] Improve hash join performance
new 8293017 Merge branch 'gerrit/neo'
64786ba is described below
commit 64786ba65f6b1b950534a46736c2b2eb3b6605e8
Author: Dmitry Lychagin <[email protected]>
AuthorDate: Wed Jan 12 17:03:52 2022 -0800
[ASTERIXDB-3004][RT] Improve hash join performance
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Improve hash join performance when joined values are NULL/MISSING
- Add SqlppHashJoinRQJTest to test different hash join scenarios
Change-Id: I8f0afb05908e8281f2865775e074d459964fe989
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/14784
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Dmitry Lychagin <[email protected]>
Reviewed-by: Ali Alsuliman <[email protected]>
---
.../asterix/test/runtime/ExecutionTestUtil.java | 7 +-
.../asterix/test/runtime/SqlppHashJoinRQJTest.java | 442 +++++++++++++++++++++
.../test/runtime/SqlppNumericIndexRQGTest.java | 4 +-
.../asterix/test/runtime/SqlppRQGTestBase.java | 2 +-
.../hash_join_missing.1.query.sqlpp | 23 +-
.../hash_join_missing.2.query.sqlpp | 23 +-
.../join/hash_join_missing/hash_join_missing.1.adm | 1 +
.../join/hash_join_missing/hash_join_missing.2.adm | 1 +
.../test/resources/runtimets/testsuite_sqlpp.xml | 5 +
.../PredicateEvaluatorFactoryProvider.java | 19 +-
.../physical/HybridHashJoinPOperator.java | 19 +-
.../physical/InMemoryHashJoinPOperator.java | 14 +-
.../api/dataflow/value/IPredicateEvaluator.java | 5 +-
.../dataflow/value/IPredicateEvaluatorFactory.java | 5 +-
.../value/IPredicateEvaluatorFactoryProvider.java | 3 +-
.../dataflow/std/join/HybridHashJoinUtil.java | 17 +-
.../dataflow/std/join/InMemoryHashJoin.java | 27 +-
.../join/InMemoryHashJoinOperatorDescriptor.java | 20 +-
.../hyracks/dataflow/std/join/NestedLoopJoin.java | 23 +-
.../std/join/NestedLoopJoinOperatorDescriptor.java | 16 +-
.../dataflow/std/join/OptimizedHybridHashJoin.java | 70 ++--
.../OptimizedHybridHashJoinOperatorDescriptor.java | 33 +-
.../integration/OptimizedHybridHashJoinTest.java | 4 +-
.../integration/TPCHCustomerOrderHashJoinTest.java | 44 +-
.../apache/hyracks/examples/tpch/client/Join.java | 4 +-
25 files changed, 620 insertions(+), 211 deletions(-)
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
index 2328bf4..ccdf620 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ExecutionTestUtil.java
@@ -73,11 +73,10 @@ public class ExecutionTestUtil {
}
integrationUtil.init(cleanup, configFile);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("initializing HDFS");
- }
-
if (startHdfs) {
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info("initializing HDFS");
+ }
HDFSCluster.getInstance().setup();
}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHashJoinRQJTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHashJoinRQJTest.java
new file mode 100644
index 0000000..40ee38e
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppHashJoinRQJTest.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.test.runtime;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintWriter;
+import java.nio.IntBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import org.apache.asterix.api.http.server.QueryServiceRequestParameters;
+import org.apache.asterix.common.utils.Servlets;
+import org.apache.asterix.lang.sqlpp.parser.SqlppHint;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.asterix.testframework.xml.ParameterTypeEnum;
+import org.apache.asterix.testframework.xml.TestCase;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.math3.random.MersenneTwister;
+import org.apache.commons.math3.random.RandomGenerator;
+import org.apache.hyracks.algebricks.common.utils.Pair;
+import org.apache.hyracks.algebricks.core.algebra.base.PhysicalOperatorTag;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * RQG testsuite for hash joins.
+ * Tests:
+ * <ul>
+ * <li> Fields with / without NULL or MISSING values
+ * <li> Inner / Left Outer joins</li>
+ * <li> Repartitioning / Broadcast joins </li>
+ * </ul>
+ */
+@RunWith(Parameterized.class)
+public class SqlppHashJoinRQJTest {
+
+ static final Logger LOGGER =
LogManager.getLogger(SqlppHashJoinRQJTest.class);
+
+ static final String CONF_PROPERTY_SEED =
+
SqlppRQGTestBase.getConfigurationPropertyName(SqlppHashJoinRQJTest.class,
"seed");
+ static final long CONF_PROPERTY_SEED_DEFAULT = System.currentTimeMillis();
+
+ static final String CONF_PROPERTY_LIMIT =
+
SqlppRQGTestBase.getConfigurationPropertyName(SqlppHashJoinRQJTest.class,
"limit");
+ static final int CONF_PROPERTY_LIMIT_DEFAULT = 40;
+
+ static final String CONF_PROPERTY_OFFSET =
+
SqlppRQGTestBase.getConfigurationPropertyName(SqlppHashJoinRQJTest.class,
"offset");
+ static final int CONF_PROPERTY_OFFSET_DEFAULT = 0;
+
+ static final Path OUTPUT_DIR = Paths.get("target",
SqlppHashJoinRQJTest.class.getSimpleName());
+
+ static final String DATAVERSE_NAME = "dvTest";
+ static final String[] DATASET_NAMES = new String[] { "ds1", "ds2" };
+ static final String ID_COLUMN_NAME = "id";
+ static final String BASE_COLUMN_NAME = "i";
+ static final List<Integer> DATASET_ROWS = Arrays.asList(20000, 40000);
+ static final List<Integer> DATASET_COLUMNS = Arrays.asList(4, 10, 100,
1000, 10000);
+ static final int DATASET_COLUMN_LENGTH_MIN =
+
String.valueOf(DATASET_COLUMNS.stream().mapToInt(Integer::intValue).max().orElse(0)).length();
+ static final int DATASET_COLUMN_LENGTH_MAX = Math.max(20,
DATASET_COLUMN_LENGTH_MIN);
+ static final int NULLABLE_COLUMN_RATIO = 2;
+ static final int OUTER_JOIN_RATIO = 3;
+ static final int BROADCAST_RATIO = 4;
+
+ static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ static final ObjectReader OBJECT_READER =
OBJECT_MAPPER.readerFor(ObjectNode.class);
+
+ static long datasetRowCount;
+ static int datasetColumnLength;
+ static TestExecutor testExecutor;
+
+ final TestInstance testInstance;
+
+ public SqlppHashJoinRQJTest(TestInstance testInstance) {
+ this.testInstance = testInstance;
+ }
+
+ @Parameterized.Parameters(name = "SqlppHashJoinRQJTest {index}: {0}")
+ public static Collection<TestInstance> tests() {
+ long seed =
SqlppRQGTestBase.getLongConfigurationProperty(CONF_PROPERTY_SEED,
CONF_PROPERTY_SEED_DEFAULT);
+ int limit =
+ (int)
SqlppRQGTestBase.getLongConfigurationProperty(CONF_PROPERTY_LIMIT,
CONF_PROPERTY_LIMIT_DEFAULT);
+ int testOffset =
+ (int)
SqlppRQGTestBase.getLongConfigurationProperty(CONF_PROPERTY_OFFSET,
CONF_PROPERTY_OFFSET_DEFAULT);
+
+ LOGGER.info(String.format("Testsuite configuration: -D%s=%d -D%s=%d
-D%s=%d", CONF_PROPERTY_SEED, seed,
+ CONF_PROPERTY_LIMIT, limit, CONF_PROPERTY_OFFSET, testOffset));
+
+ RandomGenerator random = new MersenneTwister(seed);
+ datasetRowCount = randomElement(DATASET_ROWS, random);
+ datasetColumnLength =
+ DATASET_COLUMN_LENGTH_MIN +
random.nextInt(DATASET_COLUMN_LENGTH_MAX - DATASET_COLUMN_LENGTH_MIN);
+
+ LOGGER.info(String.format("Dataset row count=%d, column length=%d",
datasetRowCount, datasetColumnLength));
+
+ LinkedHashMap<IntBuffer, TestInstance> testCases = new
LinkedHashMap<>();
+ int i = 0;
+ while (i < limit) {
+ int c0 = randomElement(DATASET_COLUMNS, random);
+ boolean c0nullable = random.nextInt(NULLABLE_COLUMN_RATIO) == 0;
+ int c1 = randomElement(DATASET_COLUMNS, random);
+ boolean c1nullable = random.nextInt(NULLABLE_COLUMN_RATIO) == 0;
+ boolean outerJoin = random.nextInt(OUTER_JOIN_RATIO) == 0;
+ boolean broadcast = random.nextInt(BROADCAST_RATIO) == 0;
+ TestInstance test = new TestInstance(i, c0, c0nullable, c1,
c1nullable, outerJoin, broadcast);
+ IntBuffer testSignature = test.signature();
+ if (testCases.containsKey(testSignature)) {
+ continue;
+ }
+ if (i >= testOffset) {
+ testCases.put(testSignature, test);
+ }
+ i++;
+ }
+ return testCases.values();
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ testExecutor = new TestExecutor();
+ LangExecutionUtil.setUp(SqlppRQGTestBase.TEST_CONFIG_FILE_NAME,
testExecutor, false);
+
+ FileUtils.forceMkdir(OUTPUT_DIR.toFile());
+ for (String datasetName : DATASET_NAMES) {
+ Path datasetFilePath = OUTPUT_DIR.resolve(datasetName + ".adm");
+ LOGGER.info("Writing data file: " +
datasetFilePath.toAbsolutePath());
+ try (PrintWriter pw = new PrintWriter(datasetFilePath.toFile())) {
+ for (int i = 0; i < datasetRowCount; i++) {
+ writeRecord(pw, datasetName, i);
+ }
+ }
+ }
+
+ StringBuilder sb = new StringBuilder(2048);
+ addDropDataverse(sb, DATAVERSE_NAME);
+ addCreateDataverse(sb, DATAVERSE_NAME);
+ for (String datasetName : DATASET_NAMES) {
+ addCreateDataset(sb, DATAVERSE_NAME, datasetName);
+ addLoadDataset(sb, DATAVERSE_NAME, datasetName);
+ }
+ executeUpdateOrDdl(sb.toString());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Test
+ public void test() throws Exception {
+ LOGGER.info(testInstance);
+ testInstance.execute();
+ }
+
+ private static void addDropDataverse(StringBuilder sb, String
dataverseName) {
+ sb.append(String.format("DROP DATAVERSE %s IF EXISTS;\n",
dataverseName));
+ }
+
+ private static void addCreateDataverse(StringBuilder sb, String
dataverseName) {
+ sb.append(String.format("CREATE DATAVERSE %s;\n", dataverseName));
+ }
+
+ private static void addCreateDataset(StringBuilder sb, String
dataverseName, String datasetName) {
+ sb.append("CREATE DATASET
").append(dataverseName).append('.').append(datasetName);
+ sb.append(" (").append(ID_COLUMN_NAME).append(" string");
+ sb.append(") ");
+ sb.append("OPEN TYPE PRIMARY KEY
").append(ID_COLUMN_NAME).append(";\n");
+ }
+
+ private static void addLoadDataset(StringBuilder sb, String dataverseName,
String datasetName) {
+ sb.append(String.format(
+ "LOAD DATASET %s.%s USING
localfs((`path`=`asterix_nc1://%s/%s.adm`),(`format`=`adm`));%n",
+ dataverseName, datasetName, OUTPUT_DIR, datasetName));
+ }
+
+ private static void writeRecord(PrintWriter pw, String datasetName, int
id) throws IOException {
+ pw.print("{");
+ pw.print(String.format("\"%s\": \"%s:%d\"", ID_COLUMN_NAME,
datasetName, id));
+ int nColumns = DATASET_COLUMNS.size();
+ for (int i = 0; i < nColumns; i++) {
+ long c = DATASET_COLUMNS.get(i);
+ writeColumn(pw, c, false, id); // no NULL/MISSING
+ writeColumn(pw, c, true, id); // with NULL/MISSING
+ }
+ pw.println("}");
+ }
+
+ private static String getColumnName(long c, boolean nullable) {
+ return BASE_COLUMN_NAME + c + (nullable ? "n" : "");
+ }
+
+ private static void writeColumn(Appendable out, long c, boolean nullable,
long id) throws IOException {
+ String columnName = getColumnName(c, nullable);
+ boolean isNull = false;
+ long v;
+ if (nullable) {
+ long r = id % (2 * c);
+ if (r < c) {
+ v = r + 1;
+ } else if (r % 2 == 0) {
+ v = 0;
+ isNull = true;
+ } else {
+ // MISSING -> nothing to do
+ return;
+ }
+ } else {
+ long r = id % c;
+ v = r + 1;
+ }
+ String text;
+ if (isNull) {
+ text = "null";
+ } else {
+ int cLen = datasetColumnLength;
+ StringBuilder textBuilder = new StringBuilder(cLen + 2);
+ textBuilder.append('"').append(v);
+ int pad = cLen - (textBuilder.length() - 1);
+ for (int i = 0; i < pad; i++) {
+ textBuilder.append(' ');
+ }
+ textBuilder.append('"');
+ text = textBuilder.toString();
+ }
+ out.append(String.format(",\"%s\":%s", columnName, text));
+ }
+
+ private static void executeUpdateOrDdl(String statement) throws Exception {
+ LOGGER.debug("Executing: " + statement);
+ testExecutor.executeSqlppUpdateOrDdl(statement,
TestCaseContext.OutputFormat.CLEAN_JSON);
+ }
+
+ private static Pair<ArrayNode, String> executeQuery(String query, boolean
fetchPlan) throws Exception {
+ LOGGER.debug("Executing: " + query);
+
+ List<TestCase.CompilationUnit.Parameter> params;
+ if (fetchPlan) {
+ TestCase.CompilationUnit.Parameter planParameter = new
TestCase.CompilationUnit.Parameter();
+
planParameter.setName(QueryServiceRequestParameters.Parameter.OPTIMIZED_LOGICAL_PLAN.str());
+ planParameter.setValue(Boolean.TRUE.toString());
+ planParameter.setType(ParameterTypeEnum.STRING);
+ params = Collections.singletonList(planParameter);
+ } else {
+ params = Collections.emptyList();
+ }
+
+ try (InputStream resultStream =
testExecutor.executeQueryService(query, TestCaseContext.OutputFormat.CLEAN_JSON,
+ testExecutor.getEndpoint(Servlets.QUERY_SERVICE), params,
true, StandardCharsets.UTF_8)) {
+ JsonNode r = OBJECT_READER.readTree(resultStream);
+ JsonNode errors = r.get("errors");
+ if (errors != null) {
+ Assert.fail("Query failed: " + errors);
+ }
+ JsonNode results = r.get("results");
+ if (!results.isArray()) {
+ Assert.fail("Expected array result, got: " + results);
+ }
+ ArrayNode resultsArray = (ArrayNode) results;
+ String plan = fetchPlan ?
r.get("plans").get("optimizedLogicalPlan").asText() : null;
+ return new Pair<>(resultsArray, plan);
+ }
+ }
+
+ private static <T> T randomElement(List<T> list, RandomGenerator
randomGenerator) {
+ return list.get(randomGenerator.nextInt(list.size()));
+ }
+
+ private static class TestInstance {
+
+ private final int id;
+
+ private final int c0;
+ private final int c1;
+ private final boolean c0nullable;
+ private final boolean c1nullable;
+ private final String col0;
+ private final String col1;
+ private final boolean outerJoin;
+ private final boolean broadcastJoin;
+
+ public TestInstance(int id, int c0, boolean c0nullable, int c1,
boolean c1nullable, boolean outerJoin,
+ boolean broadcastJoin) {
+ this.id = id;
+ this.outerJoin = outerJoin;
+ this.c0 = c0;
+ this.c1 = c1;
+ this.c0nullable = c0nullable;
+ this.c1nullable = c1nullable;
+ this.broadcastJoin = broadcastJoin;
+ this.col0 = getColumnName(c0, c0nullable);
+ this.col1 = getColumnName(c1, c1nullable);
+ }
+
+ IntBuffer signature() {
+ return IntBuffer.wrap(
+ new int[] { c0, toInt(c0nullable), c1, toInt(c1nullable),
toInt(outerJoin), toInt(broadcastJoin) });
+ }
+
+ void execute() throws Exception {
+ String query = createQuery();
+ Pair<ArrayNode, String> res = executeQuery(query, true);
+ String plan = res.second;
+ if
(!plan.contains(PhysicalOperatorTag.HYBRID_HASH_JOIN.toString())) {
+ Assert.fail(PhysicalOperatorTag.HYBRID_HASH_JOIN + " operator
was not used in query plan " + plan);
+ }
+ if (broadcastJoin &&
!plan.contains(PhysicalOperatorTag.BROADCAST_EXCHANGE.toString())) {
+ Assert.fail(PhysicalOperatorTag.BROADCAST_EXCHANGE + "
operator was not used in query plan " + plan);
+ }
+ ArrayNode resultArray = res.first;
+
+ long expectedRowCount;
+ long expectedRowCountInnerJoin = Math.min(c0, c1);
+ if (outerJoin) {
+ expectedRowCount = expectedRowCountInnerJoin + (c0nullable ? 2
: 0) + Math.max(0, c0 - c1);
+ } else {
+ expectedRowCount = expectedRowCountInnerJoin;
+ }
+
+ long expectedAggCountInnerJoin = (datasetRowCount *
datasetRowCount) / (((long) c0) * c1)
+ / (c0nullable ? 2 : 1) / (c1nullable ? 2 : 1);
+
+ int actualRowCount = resultArray.size();
+ if (actualRowCount != expectedRowCount) {
+ String commentHash = String.format("%s;%s", this, query);
+ File fHash = SqlppRQGTestBase.writeResult(OUTPUT_DIR,
resultArray, id, "hash", commentHash);
+ Assert.fail(String.format("Unexpected row count %d for query
#%d [%s]. Expected row count: %d %n %s ",
+ actualRowCount, id, this, expectedRowCount,
fHash.getAbsolutePath()));
+ }
+
+ String col0Alias = String.format("%s_%s", DATASET_NAMES[0], col0);
+
+ for (int i = 0; i < actualRowCount; i++) {
+ JsonNode resultRecord = resultArray.get(i);
+ long actualAggCount = resultRecord.get("cnt").longValue();
+
+ long expectedAggCount;
+ if (outerJoin) {
+ JsonNode col0Node = resultRecord.get(col0Alias);
+ if (col0Node == null || col0Node.isNull()) {
+ expectedAggCount = datasetRowCount / 4;
+ } else {
+ if (getValueAsLong(col0Node) > c1) {
+ expectedAggCount = datasetRowCount / (c0nullable ?
2 : 1) / c0;
+ } else {
+ expectedAggCount = expectedAggCountInnerJoin;
+ }
+ }
+ } else {
+ expectedAggCount = expectedAggCountInnerJoin;
+ }
+
+ if (actualAggCount != expectedAggCount) {
+ String commentHash = String.format("%s;%s", this, query);
+ File fHash = SqlppRQGTestBase.writeResult(OUTPUT_DIR,
resultArray, id, "hash", commentHash);
+ Assert.fail(String.format(
+ "Unexpected agg count %d in row %d for query #%d
[%s]. Expected agg count: %d %n %s ",
+ actualAggCount, i, id, this, expectedAggCount,
fHash.getAbsolutePath()));
+ }
+ }
+ }
+
+ private long getValueAsLong(JsonNode node) throws Exception {
+ String text = node.textValue().trim();
+ if (text.isEmpty()) {
+ throw new Exception("Unexpected result value: " + node);
+ }
+ try {
+ return Long.parseLong(text);
+ } catch (NumberFormatException e) {
+ throw new Exception("Unexpected result value: " + node);
+ }
+ }
+
+ String createQuery() {
+ return String.format(
+ "USE %s; SELECT t1.%s AS %s_%s, t2.%s AS %s_%s, count(*)
AS cnt FROM %s t1 %s JOIN %s t2 ON t1.%s /*%s*/ = t2.%s /*%s*/ GROUP BY t1.%s,
t2.%s ORDER BY t1.%s, t2.%s",
+ DATAVERSE_NAME, col0, DATASET_NAMES[0], col0, col1,
DATASET_NAMES[1], col1, DATASET_NAMES[0],
+ getJoinKind(), DATASET_NAMES[1], col0, getJoinHint(),
col1, getGroupByHint(), col0, col1, col0,
+ col1);
+ }
+
+ private String getJoinKind() {
+ return outerJoin ? "LEFT OUTER" : "INNER";
+ }
+
+ private String getJoinHint() {
+ return broadcastJoin ? "+" +
SqlppHint.HASH_BROADCAST_JOIN_HINT.getIdentifier() : "";
+ }
+
+ private String getGroupByHint() {
+ return "";
+ }
+
+ @Override
+ public String toString() {
+ return String.format("%s ON %s=%s %s", getJoinKind(), col0, col1,
getJoinHint());
+ }
+
+ static int toInt(boolean b) {
+ return b ? 1 : 0;
+ }
+ }
+}
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppNumericIndexRQGTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppNumericIndexRQGTest.java
index 5201681..e97a389 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppNumericIndexRQGTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppNumericIndexRQGTest.java
@@ -127,7 +127,7 @@ public class SqlppNumericIndexRQGTest {
@BeforeClass
public static void setUp() throws Exception {
testExecutor = new TestExecutor();
- LangExecutionUtil.setUp(SqlppRQGTestBase.TEST_CONFIG_FILE_NAME,
testExecutor);
+ LangExecutionUtil.setUp(SqlppRQGTestBase.TEST_CONFIG_FILE_NAME,
testExecutor, false);
StringBuilder sb = new StringBuilder(2048);
addDropDataverse(sb, DATAVERSE_NAME);
@@ -293,7 +293,7 @@ public class SqlppNumericIndexRQGTest {
}
}
sb.append(") ");
- sb.append("OPEN TYPE PRIMARY KEY id;\n");
+ sb.append("OPEN TYPE PRIMARY KEY
").append(ID_COLUMN_NAME).append(";\n");
}
private static void addLoadDataset(StringBuilder sb, String dataverseName,
String datasetName) {
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppRQGTestBase.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppRQGTestBase.java
index ec4b55f..dbf214b 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppRQGTestBase.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/SqlppRQGTestBase.java
@@ -381,7 +381,7 @@ public abstract class SqlppRQGTestBase {
protected static void startAsterix() throws Exception {
testExecutor = new TestExecutor();
- LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor, false);
loadAsterixData();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.1.query.sqlpp
similarity index 71%
copy from
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.1.query.sqlpp
index eb94514..337226d 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.1.query.sqlpp
@@ -17,14 +17,21 @@
* under the License.
*/
-package org.apache.hyracks.api.dataflow.value;
-
-import java.io.Serializable;
-
/*
- * Provides PredicateEvaluator for equi-join related operators
+ * Test hash join when values on both side are MISSING
*/
-public interface IPredicateEvaluatorFactory extends Serializable {
- public IPredicateEvaluator createPredicateEvaluator();
-}
+with
+R as (
+ from range(1, 50000) r
+ select (case when get_year(current_date()) > 0 then missing else r end) as r
+),
+
+S as (
+ from range(1, 50000) s
+ select (case when get_year(current_date()) > 0 then missing else s end) as s
+)
+
+select count(*) cnt
+from R, S
+where R.r = S.s;
\ No newline at end of file
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.2.query.sqlpp
similarity index 71%
copy from
hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
copy to
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.2.query.sqlpp
index eb94514..94de090 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/join/hash_join_missing/hash_join_missing.2.query.sqlpp
@@ -17,14 +17,21 @@
* under the License.
*/
-package org.apache.hyracks.api.dataflow.value;
-
-import java.io.Serializable;
-
/*
- * Provides PredicateEvaluator for equi-join related operators
+ * Test hash join when values on both side are NULL
*/
-public interface IPredicateEvaluatorFactory extends Serializable {
- public IPredicateEvaluator createPredicateEvaluator();
-}
+with
+R as (
+ from range(1, 50000) r
+ select (case when get_year(current_date()) > 0 then null else r end) as r
+),
+
+S as (
+ from range(1, 50000) s
+ select (case when get_year(current_date()) > 0 then null else s end) as s
+)
+
+select count(*) cnt
+from R, S
+where R.r = S.s;
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.1.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.1.adm
new file mode 100644
index 0000000..bacb60c
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.1.adm
@@ -0,0 +1 @@
+{ "cnt": 0 }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.2.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.2.adm
new file mode 100644
index 0000000..bacb60c
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/join/hash_join_missing/hash_join_missing.2.adm
@@ -0,0 +1 @@
+{ "cnt": 0 }
\ No newline at end of file
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
index d400c27..c1aed2a 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_sqlpp.xml
@@ -6630,6 +6630,11 @@
</compilation-unit>
</test-case>
<test-case FilePath="join">
+ <compilation-unit name="hash_join_missing">
+ <output-dir compare="Text">hash_join_missing</output-dir>
+ </compilation-unit>
+ </test-case>
+ <test-case FilePath="join">
<compilation-unit name="hash_join_record">
<output-dir compare="Text">hash_join_record</output-dir>
</compilation-unit>
diff --git
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/PredicateEvaluatorFactoryProvider.java
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/PredicateEvaluatorFactoryProvider.java
index ab9f4e0..286d1cc 100644
---
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/PredicateEvaluatorFactoryProvider.java
+++
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/formats/nontagged/PredicateEvaluatorFactoryProvider.java
@@ -26,31 +26,24 @@ import
org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import
org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
-/*
-Provides PredicateEvaluator for equi-join cases to properly take care of NULL
fields, being compared with each other.
-If any of the join keys, from either side, is NULL, record should not pass
equi-join condition.
-*/
+/**
+ * Provides PredicateEvaluator for equi-join cases to disqualify tuples having
NULL/MISSING fields
+ * If any of the join keys, from either side, is NULL/MISSING, the tuple will
not pass equi-join condition.
+ */
public class PredicateEvaluatorFactoryProvider implements
IPredicateEvaluatorFactoryProvider {
private static final long serialVersionUID = 1L;
public static final PredicateEvaluatorFactoryProvider INSTANCE = new
PredicateEvaluatorFactoryProvider();
@Override
- public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(final int[]
keys0, final int[] keys1) {
+ public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(final int[]
keys) {
return new IPredicateEvaluatorFactory() {
private static final long serialVersionUID = 1L;
@Override
public IPredicateEvaluator createPredicateEvaluator() {
- return new IPredicateEvaluator() {
-
- @Override
- public boolean evaluate(IFrameTupleAccessor fta0, int
tupId0, IFrameTupleAccessor fta1,
- int tupId1) {
- return noNullOrMissingInKeys(fta0, tupId0, keys0) &&
noNullOrMissingInKeys(fta1, tupId1, keys1);
- }
- };
+ return (fta, tupId) -> noNullOrMissingInKeys(fta, tupId, keys);
}
};
}
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
index 7e7d012..cc89deb 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/HybridHashJoinPOperator.java
@@ -109,10 +109,11 @@ public class HybridHashJoinPOperator extends
AbstractHashJoinPOperator {
IBinaryHashFunctionFamily[] rightHashFunFamilies =
JobGenHelper.variablesToBinaryHashFunctionFamilies(keysRightBranch, env,
context);
- IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider =
- context.getPredicateEvaluatorFactoryProvider();
- IPredicateEvaluatorFactory predEvaluatorFactory =
predEvaluatorFactoryProvider == null ? null
- :
predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight);
+ IPredicateEvaluatorFactoryProvider predEvalFactoryProvider =
context.getPredicateEvaluatorFactoryProvider();
+ IPredicateEvaluatorFactory leftPredEvalFactory =
+ predEvalFactoryProvider == null ? null :
predEvalFactoryProvider.getPredicateEvaluatorFactory(keysLeft);
+ IPredicateEvaluatorFactory rightPredEvalFactory =
predEvalFactoryProvider == null ? null
+ :
predEvalFactoryProvider.getPredicateEvaluatorFactory(keysRight);
RecordDescriptor recDescriptor =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
propagatedSchema, context);
@@ -131,7 +132,7 @@ public class HybridHashJoinPOperator extends
AbstractHashJoinPOperator {
opDesc = generateOptimizedHashJoinRuntime(context, joinOp,
inputSchemas, keysLeft, keysRight,
leftHashFunFamilies, rightHashFunFamilies, comparatorFactory,
reverseComparatorFactory,
- predEvaluatorFactory, recDescriptor, spec);
+ leftPredEvalFactory, rightPredEvalFactory, recDescriptor,
spec);
opDesc.setSourceLocation(op.getSourceLocation());
contributeOpDesc(builder, (AbstractLogicalOperator) op, opDesc);
@@ -145,20 +146,20 @@ public class HybridHashJoinPOperator extends
AbstractHashJoinPOperator {
AbstractBinaryJoinOperator joinOp, IOperatorSchema[] inputSchemas,
int[] keysLeft, int[] keysRight,
IBinaryHashFunctionFamily[] leftHashFunFamilies,
IBinaryHashFunctionFamily[] rightHashFunFamilies,
ITuplePairComparatorFactory comparatorFactory,
ITuplePairComparatorFactory reverseComparatorFactory,
- IPredicateEvaluatorFactory predEvaluatorFactory, RecordDescriptor
recDescriptor,
- IOperatorDescriptorRegistry spec) throws AlgebricksException {
+ IPredicateEvaluatorFactory leftPredEvalFactory,
IPredicateEvaluatorFactory rightPredEvalFactory,
+ RecordDescriptor recDescriptor, IOperatorDescriptorRegistry spec)
throws AlgebricksException {
int memSizeInFrames =
localMemoryRequirements.getMemoryBudgetInFrames();
switch (kind) {
case INNER:
return new OptimizedHybridHashJoinOperatorDescriptor(spec,
memSizeInFrames, maxInputBuildSizeInFrames,
getFudgeFactor(), keysLeft, keysRight,
leftHashFunFamilies, rightHashFunFamilies, recDescriptor,
- comparatorFactory, reverseComparatorFactory,
predEvaluatorFactory);
+ comparatorFactory, reverseComparatorFactory,
leftPredEvalFactory, rightPredEvalFactory);
case LEFT_OUTER:
IMissingWriterFactory[] nonMatchWriterFactories =
JobGenHelper.createMissingWriterFactories(context,
((LeftOuterJoinOperator) joinOp).getMissingValue(),
inputSchemas[1].getSize());
return new OptimizedHybridHashJoinOperatorDescriptor(spec,
memSizeInFrames, maxInputBuildSizeInFrames,
getFudgeFactor(), keysLeft, keysRight,
leftHashFunFamilies, rightHashFunFamilies, recDescriptor,
- comparatorFactory, reverseComparatorFactory,
predEvaluatorFactory, true,
+ comparatorFactory, reverseComparatorFactory,
leftPredEvalFactory, rightPredEvalFactory, true,
nonMatchWriterFactories);
default:
throw new NotImplementedException();
diff --git
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
index 84dda11..b96b887 100644
---
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
+++
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/InMemoryHashJoinPOperator.java
@@ -44,8 +44,6 @@ import
org.apache.hyracks.algebricks.runtime.evaluators.TuplePairEvaluatorFactor
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
-import
org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactoryProvider;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
@@ -93,11 +91,6 @@ public class InMemoryHashJoinPOperator extends
AbstractHashJoinPOperator {
IBinaryHashFunctionFactory[] rightHashFunFactories =
JobGenHelper.variablesToBinaryHashFunctionFactories(keysRightBranch, env,
context);
- IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider =
- context.getPredicateEvaluatorFactoryProvider();
- IPredicateEvaluatorFactory predEvaluatorFactory =
predEvaluatorFactoryProvider == null ? null
- :
predEvaluatorFactoryProvider.getPredicateEvaluatorFactory(keysLeft, keysRight);
-
RecordDescriptor recDescriptor =
JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op),
propagatedSchema, context);
IOperatorSchema[] conditionInputSchemas = new IOperatorSchema[1];
@@ -116,15 +109,14 @@ public class InMemoryHashJoinPOperator extends
AbstractHashJoinPOperator {
switch (kind) {
case INNER:
opDesc = new InMemoryHashJoinOperatorDescriptor(spec,
keysLeft, keysRight, leftHashFunFactories,
- rightHashFunFactories, comparatorFactory,
recDescriptor, tableSize, predEvaluatorFactory,
- memSizeInFrames);
+ rightHashFunFactories, comparatorFactory,
recDescriptor, tableSize, memSizeInFrames);
break;
case LEFT_OUTER:
IMissingWriterFactory[] nonMatchWriterFactories =
JobGenHelper.createMissingWriterFactories(context,
((LeftOuterJoinOperator) joinOp).getMissingValue(),
inputSchemas[1].getSize());
opDesc = new InMemoryHashJoinOperatorDescriptor(spec,
keysLeft, keysRight, leftHashFunFactories,
- rightHashFunFactories, comparatorFactory,
predEvaluatorFactory, recDescriptor, true,
- nonMatchWriterFactories, tableSize, memSizeInFrames);
+ rightHashFunFactories, comparatorFactory,
recDescriptor, true, nonMatchWriterFactories,
+ tableSize, memSizeInFrames);
break;
default:
throw new NotImplementedException();
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluator.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluator.java
index 5fbc1ab..20381f3 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluator.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluator.java
@@ -21,9 +21,6 @@ package org.apache.hyracks.api.dataflow.value;
import org.apache.hyracks.api.comm.IFrameTupleAccessor;
-/*
- * Compares two tuples to make sure that records, whose comparison keys are
NULL do not pass comparator filter
- */
public interface IPredicateEvaluator {
- public boolean evaluate(IFrameTupleAccessor fta0, int tupId0,
IFrameTupleAccessor fta1, int tupId1);
+ boolean evaluate(IFrameTupleAccessor fta, int tupId);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
index eb94514..8748587 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactory.java
@@ -22,9 +22,8 @@ package org.apache.hyracks.api.dataflow.value;
import java.io.Serializable;
/*
- * Provides PredicateEvaluator for equi-join related operators
+ * Provides PredicateEvaluator
*/
-
public interface IPredicateEvaluatorFactory extends Serializable {
- public IPredicateEvaluator createPredicateEvaluator();
+ IPredicateEvaluator createPredicateEvaluator();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
index 3eefc29..bc3e5c8 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataflow/value/IPredicateEvaluatorFactoryProvider.java
@@ -24,7 +24,6 @@ import java.io.Serializable;
/*
* Provides PredicateEvaluatorFactory based on (equi-join) keys
*/
-
public interface IPredicateEvaluatorFactoryProvider extends Serializable {
- public IPredicateEvaluatorFactory getPredicateEvaluatorFactory(int[]
keys0, int[] keys1);
+ IPredicateEvaluatorFactory getPredicateEvaluatorFactory(int[] keys);
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinUtil.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinUtil.java
index 398dee9..7cee242 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/HybridHashJoinUtil.java
@@ -28,16 +28,21 @@ public class HybridHashJoinUtil {
private HybridHashJoinUtil() {
}
+ public enum SIDE {
+ BUILD,
+ PROBE
+ }
+
/**
* Prints out the detailed information for partitions: in-memory and
spilled partitions.
* This method exists for a debug purpose.
*/
- public String printPartitionInfo(BitSet spilledStatus,
OptimizedHybridHashJoin.SIDE whichSide, int numOfPartitions,
- int[] probePSizeInTups, int[] buildPSizeInTups, RunFileWriter[]
probeRFWriters,
- RunFileWriter[] buildRFWriters, IPartitionedTupleBufferManager
bufferManager) {
+ public String printPartitionInfo(BitSet spilledStatus, SIDE whichSide, int
numOfPartitions, int[] probePSizeInTups,
+ int[] buildPSizeInTups, RunFileWriter[] probeRFWriters,
RunFileWriter[] buildRFWriters,
+ IPartitionedTupleBufferManager bufferManager) {
StringBuilder buf = new StringBuilder();
buf.append(">>> " + this + " " + Thread.currentThread().getId() + "
printInfo():" + "\n");
- if (whichSide == OptimizedHybridHashJoin.SIDE.BUILD) {
+ if (whichSide == SIDE.BUILD) {
buf.append("BUILD side" + "\n");
} else {
buf.append("PROBE side" + "\n");
@@ -49,7 +54,7 @@ public class HybridHashJoinUtil {
int spilledPartByteSize = 0;
for (int pid = spilledStatus.nextSetBit(0); pid >= 0 && pid <
numOfPartitions; pid =
spilledStatus.nextSetBit(pid + 1)) {
- if (whichSide == OptimizedHybridHashJoin.SIDE.BUILD) {
+ if (whichSide == SIDE.BUILD) {
spilledTupleCount += buildPSizeInTups[pid];
spilledPartByteSize += buildRFWriters[pid].getFileSize();
buf.append("part:\t" + pid + "\t#tuple:\t" +
buildPSizeInTups[pid] + "\tsize(MB):\t"
@@ -70,7 +75,7 @@ public class HybridHashJoinUtil {
int inMemoryPartByteSize = 0;
for (int pid = spilledStatus.nextClearBit(0); pid >= 0 && pid <
numOfPartitions; pid =
spilledStatus.nextClearBit(pid + 1)) {
- if (whichSide == OptimizedHybridHashJoin.SIDE.BUILD) {
+ if (whichSide == SIDE.BUILD) {
inMemoryTupleCount += buildPSizeInTups[pid];
inMemoryPartByteSize += bufferManager.getPhysicalSize(pid);
} else {
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
index cb63b6a..4c2019e 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoin.java
@@ -28,7 +28,6 @@ import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksFrameMgrContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -59,7 +58,6 @@ public class InMemoryHashJoin {
private final ISerializableTable table;
private final TuplePointer storedTuplePointer;
private final boolean reverseOutputOrder; //Should we reverse the order of
tuples, we are writing in output
- private final IPredicateEvaluator predEvaluator;
private final TupleInFrameListAccessor tupleAccessor;
// To release frames
private final ISimpleFrameBufferManager bufferManager;
@@ -70,17 +68,16 @@ public class InMemoryHashJoin {
public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor
accessorProbe,
ITuplePartitionComputer tpcProbe, FrameTupleAccessor
accessorBuild, RecordDescriptor rDBuild,
ITuplePartitionComputer tpcBuild, boolean isLeftOuter,
IMissingWriter[] missingWritersBuild,
- ISerializableTable table, IPredicateEvaluator predEval,
ISimpleFrameBufferManager bufferManager)
- throws HyracksDataException {
+ ISerializableTable table, ISimpleFrameBufferManager bufferManager)
throws HyracksDataException {
this(ctx, accessorProbe, tpcProbe, accessorBuild, rDBuild, tpcBuild,
isLeftOuter, missingWritersBuild, table,
- predEval, false, bufferManager);
+ false, bufferManager);
}
public InMemoryHashJoin(IHyracksFrameMgrContext ctx, FrameTupleAccessor
accessorProbe,
ITuplePartitionComputer tpcProbe, FrameTupleAccessor
accessorBuild, RecordDescriptor rDBuild,
ITuplePartitionComputer tpcBuild, boolean isLeftOuter,
IMissingWriter[] missingWritersBuild,
- ISerializableTable table, IPredicateEvaluator predEval, boolean
reverse,
- ISimpleFrameBufferManager bufferManager) throws
HyracksDataException {
+ ISerializableTable table, boolean reverse,
ISimpleFrameBufferManager bufferManager)
+ throws HyracksDataException {
this.table = table;
storedTuplePointer = new TuplePointer();
buffers = new ArrayList<>();
@@ -89,7 +86,6 @@ public class InMemoryHashJoin {
this.accessorProbe = accessorProbe;
this.tpcProbe = tpcProbe;
appender = new FrameTupleAppender(new VSizeFrame(ctx));
- predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
if (isLeftOuter) {
int fieldCountOuter = accessorBuild.getFieldCount();
@@ -178,11 +174,8 @@ public class InMemoryHashJoin {
accessorBuild.reset(buffers.get(bIndex));
int c = tpComparator.compare(accessorProbe, tid,
accessorBuild, tIndex);
if (c == 0) {
- boolean predEval = evaluatePredicate(tid, tIndex);
- if (predEval) {
- matchFound = true;
- appendToResult(tid, tIndex, writer);
- }
+ matchFound = true;
+ appendToResult(tid, tIndex, writer);
}
}
}
@@ -228,14 +221,6 @@ public class InMemoryHashJoin {
table.close();
}
- private boolean evaluatePredicate(int tIx1, int tIx2) {
- if (reverseOutputOrder) { //Role Reversal Optimization is triggered
- return (predEvaluator == null) ||
predEvaluator.evaluate(accessorBuild, tIx2, accessorProbe, tIx1);
- } else {
- return (predEvaluator == null) ||
predEvaluator.evaluate(accessorProbe, tIx1, accessorBuild, tIx2);
- }
- }
-
private void appendToResult(int probeSidetIx, int buildSidetIx,
IFrameWriter writer) throws HyracksDataException {
if (reverseOutputOrder) {
FrameUtils.appendConcatToWriter(writer, appender, accessorBuild,
buildSidetIx, accessorProbe, probeSidetIx);
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
index 33976a8..f89ccb0 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/InMemoryHashJoinOperatorDescriptor.java
@@ -32,8 +32,6 @@ import org.apache.hyracks.api.dataflow.TaskId;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -63,7 +61,6 @@ public class InMemoryHashJoinOperatorDescriptor extends
AbstractOperatorDescript
private final IBinaryHashFunctionFactory[] hashFunctionFactories0;
private final IBinaryHashFunctionFactory[] hashFunctionFactories1;
private final ITuplePairComparatorFactory comparatorFactory;
- private final IPredicateEvaluatorFactory predEvaluatorFactory;
private final boolean isLeftOuter;
private final IMissingWriterFactory[] nonMatchWriterFactories;
private final int tableSize;
@@ -73,23 +70,21 @@ public class InMemoryHashJoinOperatorDescriptor extends
AbstractOperatorDescript
public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry
spec, int[] keys0, int[] keys1,
IBinaryHashFunctionFactory[] hashFunctionFactories0,
IBinaryHashFunctionFactory[] hashFunctionFactories1,
ITuplePairComparatorFactory comparatorFactory, RecordDescriptor
recordDescriptor, int tableSize,
- IPredicateEvaluatorFactory predEvalFactory, int memSizeInFrames) {
- this(spec, keys0, keys1, hashFunctionFactories0,
hashFunctionFactories1, comparatorFactory, predEvalFactory,
- recordDescriptor, false, null, tableSize, memSizeInFrames);
+ int memSizeInFrames) {
+ this(spec, keys0, keys1, hashFunctionFactories0,
hashFunctionFactories1, comparatorFactory, recordDescriptor,
+ false, null, tableSize, memSizeInFrames);
}
public InMemoryHashJoinOperatorDescriptor(IOperatorDescriptorRegistry
spec, int[] keys0, int[] keys1,
IBinaryHashFunctionFactory[] hashFunctionFactories0,
IBinaryHashFunctionFactory[] hashFunctionFactories1,
- ITuplePairComparatorFactory comparatorFactory,
IPredicateEvaluatorFactory predEvalFactory,
- RecordDescriptor recordDescriptor, boolean isLeftOuter,
IMissingWriterFactory[] missingWriterFactories1,
- int tableSize, int memSizeInFrames) {
+ ITuplePairComparatorFactory comparatorFactory, RecordDescriptor
recordDescriptor, boolean isLeftOuter,
+ IMissingWriterFactory[] missingWriterFactories1, int tableSize,
int memSizeInFrames) {
super(spec, 2, 1);
this.keys0 = keys0;
this.keys1 = keys1;
this.hashFunctionFactories0 = hashFunctionFactories0;
this.hashFunctionFactories1 = hashFunctionFactories1;
this.comparatorFactory = comparatorFactory;
- this.predEvaluatorFactory = predEvalFactory;
this.outRecDescs[0] = recordDescriptor;
this.isLeftOuter = isLeftOuter;
this.nonMatchWriterFactories = missingWriterFactories1;
@@ -159,8 +154,6 @@ public class InMemoryHashJoinOperatorDescriptor extends
AbstractOperatorDescript
} else {
nullWriters1 = null;
}
- final IPredicateEvaluator predEvaluator =
- (predEvaluatorFactory == null ? null :
predEvaluatorFactory.createPredicateEvaluator());
final int memSizeInBytes = memSizeInFrames *
jobletCtx.getInitialFrameSize();
final IDeallocatableFramePool framePool = new
DeallocatableFramePool(jobletCtx, memSizeInBytes);
@@ -178,8 +171,7 @@ public class InMemoryHashJoinOperatorDescriptor extends
AbstractOperatorDescript
state = new HashBuildTaskState(jobletCtx.getJobId(), new
TaskId(getActivityId(), partition));
ISerializableTable table = new
SerializableHashTable(tableSize, jobletCtx, bufferManager);
state.joiner = new InMemoryHashJoin(jobletCtx, new
FrameTupleAccessor(rd0), hpc0,
- new FrameTupleAccessor(rd1), rd1, hpc1,
isLeftOuter, nullWriters1, table, predEvaluator,
- bufferManager);
+ new FrameTupleAccessor(rd1), rd1, hpc1,
isLeftOuter, nullWriters1, table, bufferManager);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
index 03ff72f..414ae25 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoin.java
@@ -27,7 +27,6 @@ import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -63,22 +62,20 @@ public class NestedLoopJoin {
private final RunFileWriter runFileWriter;
private final boolean isLeftOuter;
private final ArrayTupleBuilder missingTupleBuilder;
- private final IPredicateEvaluator predEvaluator;
- // Added for handling correct calling for predicate-evaluator upon
recursive calls
+ // Added for handling correct calling of recursive calls
// (in OptimizedHybridHashJoin) that cause role-reversal
private final boolean isReversed;
private final BufferInfo tempInfo = new BufferInfo(null, -1, -1);
private final BitSet outerMatchLOJ;
public NestedLoopJoin(IHyracksJobletContext jobletContext,
FrameTupleAccessor accessorOuter,
- FrameTupleAccessor accessorInner, int memBudgetInFrames,
IPredicateEvaluator predEval, boolean isLeftOuter,
+ FrameTupleAccessor accessorInner, int memBudgetInFrames, boolean
isLeftOuter,
IMissingWriter[] missingWriters) throws HyracksDataException {
- this(jobletContext, accessorOuter, accessorInner, memBudgetInFrames,
predEval, isLeftOuter, missingWriters,
- false);
+ this(jobletContext, accessorOuter, accessorInner, memBudgetInFrames,
isLeftOuter, missingWriters, false);
}
public NestedLoopJoin(IHyracksJobletContext jobletContext,
FrameTupleAccessor accessorOuter,
- FrameTupleAccessor accessorInner, int memBudgetInFrames,
IPredicateEvaluator predEval, boolean isLeftOuter,
+ FrameTupleAccessor accessorInner, int memBudgetInFrames, boolean
isLeftOuter,
IMissingWriter[] missingWriters, boolean isReversed) throws
HyracksDataException {
this.accessorInner = accessorInner;
this.accessorOuter = accessorOuter;
@@ -97,7 +94,6 @@ public class NestedLoopJoin {
new VariableFramePool(jobletContext,
outerBufferMngrMemBudgetInBytes), FrameFreeSlotPolicyFactory
.createFreeSlotPolicy(EnumFreeSlotPolicy.LAST_FIT,
outerBufferMngrMemBudgetInFrames));
- this.predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
if (isLeftOuter) {
if (isReversed) {
@@ -202,8 +198,7 @@ public class NestedLoopJoin {
boolean matchFound = false;
for (int j = 0; j < innerTupleCount; ++j) {
int c = tpComparator.compare(accessorOuter, i, accessorInner,
j);
- boolean prdEval = evaluatePredicate(i, j);
- if (c == 0 && prdEval) {
+ if (c == 0) {
matchFound = true;
appendToResults(i, j, writer);
}
@@ -214,14 +209,6 @@ public class NestedLoopJoin {
}
}
- private boolean evaluatePredicate(int tIx1, int tIx2) {
- if (isReversed) { //Role Reversal Optimization is triggered
- return ((predEvaluator == null) ||
predEvaluator.evaluate(accessorInner, tIx2, accessorOuter, tIx1));
- } else {
- return ((predEvaluator == null) ||
predEvaluator.evaluate(accessorOuter, tIx1, accessorInner, tIx2));
- }
- }
-
private void appendToResults(int outerTupleId, int innerTupleId,
IFrameWriter writer) throws HyracksDataException {
if (isReversed) {
appendResultToFrame(accessorInner, innerTupleId, accessorOuter,
outerTupleId, writer);
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
index 1de8094..24e1b45 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/NestedLoopJoinOperatorDescriptor.java
@@ -29,8 +29,6 @@ import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.TaskId;
import org.apache.hyracks.api.dataflow.value.IMissingWriter;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluatorFactory;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparatorFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
@@ -52,25 +50,16 @@ public class NestedLoopJoinOperatorDescriptor extends
AbstractOperatorDescriptor
private static final long serialVersionUID = 1L;
private final ITuplePairComparatorFactory comparatorFactory;
private final int memSize;
- private final IPredicateEvaluatorFactory predEvaluatorFactory;
private final boolean isLeftOuter;
private final IMissingWriterFactory[] nullWriterFactories1;
public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
ITuplePairComparatorFactory comparatorFactory, RecordDescriptor
recordDescriptor, int memSize,
boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1)
{
- this(spec, comparatorFactory, recordDescriptor, memSize, null,
isLeftOuter, nullWriterFactories1);
- }
-
- public NestedLoopJoinOperatorDescriptor(IOperatorDescriptorRegistry spec,
- ITuplePairComparatorFactory comparatorFactory, RecordDescriptor
recordDescriptor, int memSize,
- IPredicateEvaluatorFactory predEvalFactory, boolean isLeftOuter,
- IMissingWriterFactory[] nullWriterFactories1) {
super(spec, 2, 1);
this.comparatorFactory = comparatorFactory;
this.outRecDescs[0] = recordDescriptor;
this.memSize = memSize;
- this.predEvaluatorFactory = predEvalFactory;
this.isLeftOuter = isLeftOuter;
this.nullWriterFactories1 = nullWriterFactories1;
}
@@ -117,8 +106,6 @@ public class NestedLoopJoinOperatorDescriptor extends
AbstractOperatorDescriptor
final IHyracksJobletContext jobletCtx = ctx.getJobletContext();
final RecordDescriptor rd0 =
recordDescProvider.getInputRecordDescriptor(nljAid, 0);
final RecordDescriptor rd1 =
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
- final IPredicateEvaluator predEvaluator =
- (predEvaluatorFactory != null) ?
predEvaluatorFactory.createPredicateEvaluator() : null;
final IMissingWriter[] nullWriters1 = isLeftOuter ? new
IMissingWriter[nullWriterFactories1.length] : null;
if (isLeftOuter) {
@@ -134,8 +121,7 @@ public class NestedLoopJoinOperatorDescriptor extends
AbstractOperatorDescriptor
public void open() throws HyracksDataException {
state = new JoinCacheTaskState(jobletCtx.getJobId(), new
TaskId(getActivityId(), partition));
state.joiner = new NestedLoopJoin(jobletCtx, new
FrameTupleAccessor(rd0),
- new FrameTupleAccessor(rd1), memSize,
predEvaluator, isLeftOuter, nullWriters1);
-
+ new FrameTupleAccessor(rd1), memSize, isLeftOuter,
nullWriters1);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
index f5eb4f7..7a9bb25 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoin.java
@@ -59,11 +59,6 @@ public class OptimizedHybridHashJoin {
// Used for special probe BigObject which can not be held into the Join
memory
private FrameTupleAppender bigFrameAppender;
- public enum SIDE {
- BUILD,
- PROBE
- }
-
private final IHyracksJobletContext jobletCtx;
private final String buildRelName;
@@ -74,7 +69,8 @@ public class OptimizedHybridHashJoin {
private final RecordDescriptor probeRd;
private final RunFileWriter[] buildRFWriters; //writing spilled build
partitions
private final RunFileWriter[] probeRFWriters; //writing spilled probe
partitions
- private final IPredicateEvaluator predEvaluator;
+ private final IPredicateEvaluator buildPredEval;
+ private final IPredicateEvaluator probePredEval;
private final boolean isLeftOuter;
private final IMissingWriter[] nonMatchWriters;
private final BitSet spilledStatus; //0=resident, 1=spilled
@@ -98,8 +94,8 @@ public class OptimizedHybridHashJoin {
public OptimizedHybridHashJoin(IHyracksJobletContext jobletCtx, int
memSizeInFrames, int numOfPartitions,
String probeRelName, String buildRelName, RecordDescriptor
probeRd, RecordDescriptor buildRd,
- ITuplePartitionComputer probeHpc, ITuplePartitionComputer
buildHpc, IPredicateEvaluator predEval,
- boolean isLeftOuter, IMissingWriterFactory[] nullWriterFactories1)
{
+ ITuplePartitionComputer probeHpc, ITuplePartitionComputer
buildHpc, IPredicateEvaluator probePredEval,
+ IPredicateEvaluator buildPredEval, boolean isLeftOuter,
IMissingWriterFactory[] nullWriterFactories1) {
this.jobletCtx = jobletCtx;
this.memSizeInFrames = memSizeInFrames;
this.buildRd = buildRd;
@@ -113,8 +109,12 @@ public class OptimizedHybridHashJoin {
this.probeRFWriters = new RunFileWriter[numOfPartitions];
this.accessorBuild = new FrameTupleAccessor(buildRd);
this.accessorProbe = new FrameTupleAccessor(probeRd);
- this.predEvaluator = predEval;
this.isLeftOuter = isLeftOuter;
+ if (isLeftOuter && probePredEval != null) {
+ throw new IllegalStateException();
+ }
+ this.buildPredEval = buildPredEval;
+ this.probePredEval = probePredEval;
this.isReversed = false;
this.spilledStatus = new BitSet(numOfPartitions);
this.nonMatchWriters = isLeftOuter ? new
IMissingWriter[nullWriterFactories1.length] : null;
@@ -141,11 +141,12 @@ public class OptimizedHybridHashJoin {
accessorBuild.reset(buffer);
int tupleCount = accessorBuild.getTupleCount();
for (int i = 0; i < tupleCount; ++i) {
- int pid = buildHpc.partition(accessorBuild, i, numOfPartitions);
- processTupleBuildPhase(i, pid);
- buildPSizeInTups[pid]++;
+ if (buildPredEval == null || buildPredEval.evaluate(accessorBuild,
i)) {
+ int pid = buildHpc.partition(accessorBuild, i,
numOfPartitions);
+ processTupleBuildPhase(i, pid);
+ buildPSizeInTups[pid]++;
+ }
}
-
}
private void processTupleBuildPhase(int tid, int pid) throws
HyracksDataException {
@@ -217,8 +218,8 @@ public class OptimizedHybridHashJoin {
ISerializableTable table = new SerializableHashTable(inMemTupCount,
jobletCtx, bufferManagerForHashTable);
this.inMemJoiner = new InMemoryHashJoin(jobletCtx, new
FrameTupleAccessor(probeRd), probeHpc,
- new FrameTupleAccessor(buildRd), buildRd, buildHpc,
isLeftOuter, nonMatchWriters, table, predEvaluator,
- isReversed, bufferManagerForHashTable);
+ new FrameTupleAccessor(buildRd), buildRd, buildHpc,
isLeftOuter, nonMatchWriters, table, isReversed,
+ bufferManagerForHashTable);
buildHashTable();
}
@@ -473,22 +474,28 @@ public class OptimizedHybridHashJoin {
public void probe(ByteBuffer buffer, IFrameWriter writer) throws
HyracksDataException {
accessorProbe.reset(buffer);
int tupleCount = accessorProbe.getTupleCount();
-
- if (isBuildRelAllInMemory()) {
- inMemJoiner.join(buffer, writer);
- return;
- }
inMemJoiner.resetAccessorProbe(accessorProbe);
- for (int i = 0; i < tupleCount; ++i) {
- int pid = probeHpc.partition(accessorProbe, i, numOfPartitions);
-
- if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple has
potential match from previous phase
- if (spilledStatus.get(pid)) { //pid is Spilled
- processTupleProbePhase(i, pid);
- } else { //pid is Resident
+ if (isBuildRelAllInMemory()) {
+ for (int i = 0; i < tupleCount; ++i) {
+ // NOTE: probePredEval is guaranteed to be 'null' for outer
join and in case of role reversal
+ if (probePredEval == null ||
probePredEval.evaluate(accessorProbe, i)) {
inMemJoiner.join(i, writer);
}
- probePSizeInTups[pid]++;
+ }
+ } else {
+ for (int i = 0; i < tupleCount; ++i) {
+ // NOTE: probePredEval is guaranteed to be 'null' for outer
join and in case of role reversal
+ if (probePredEval == null ||
probePredEval.evaluate(accessorProbe, i)) {
+ int pid = probeHpc.partition(accessorProbe, i,
numOfPartitions);
+ if (buildPSizeInTups[pid] > 0 || isLeftOuter) { //Tuple
has potential match from previous phase
+ if (spilledStatus.get(pid)) { //pid is Spilled
+ processTupleProbePhase(i, pid);
+ } else { //pid is Resident
+ inMemJoiner.join(i, writer);
+ }
+ probePSizeInTups[pid]++;
+ }
+ }
}
}
}
@@ -600,7 +607,10 @@ public class OptimizedHybridHashJoin {
return bufferManager.getPhysicalSize(pid);
}
- public void setIsReversed(boolean b) {
- this.isReversed = b;
+ public void setIsReversed(boolean reversed) {
+ if (reversed && (buildPredEval != null || probePredEval != null)) {
+ throw new IllegalStateException();
+ }
+ this.isReversed = reversed;
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
index dcccd61..555e8fb 100644
---
a/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
+++
b/hyracks-fullstack/hyracks/hyracks-dataflow-std/src/main/java/org/apache/hyracks/dataflow/std/join/OptimizedHybridHashJoinOperatorDescriptor.java
@@ -131,7 +131,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor
extends AbstractOperatorD
private final IBinaryHashFunctionFamily[] buildHashFunctionFactories;
private final ITuplePairComparatorFactory
tuplePairComparatorFactoryProbe2Build; //For HHJ & NLJ in probe
private final ITuplePairComparatorFactory
tuplePairComparatorFactoryBuild2Probe; //For HHJ & NLJ in probe
- private final IPredicateEvaluatorFactory predEvaluatorFactory;
+ private final IPredicateEvaluatorFactory probePredEvalFactory;
+ private final IPredicateEvaluatorFactory buildPredEvalFactory;
private final boolean isLeftOuter;
private final IMissingWriterFactory[] nonMatchWriterFactories;
@@ -148,8 +149,9 @@ public class OptimizedHybridHashJoinOperatorDescriptor
extends AbstractOperatorD
IBinaryHashFunctionFamily[] propHashFunctionFactories,
IBinaryHashFunctionFamily[] buildHashFunctionFactories,
RecordDescriptor recordDescriptor,
ITuplePairComparatorFactory tupPaircomparatorFactory01,
- ITuplePairComparatorFactory tupPaircomparatorFactory10,
IPredicateEvaluatorFactory predEvaluatorFactory,
- boolean isLeftOuter, IMissingWriterFactory[]
nonMatchWriterFactories) {
+ ITuplePairComparatorFactory tupPaircomparatorFactory10,
IPredicateEvaluatorFactory predEvalFactory0,
+ IPredicateEvaluatorFactory predEvalFactory1, boolean isLeftOuter,
+ IMissingWriterFactory[] nonMatchWriterFactories) {
super(spec, 2, 1);
this.memSizeInFrames = memSizeInFrames;
this.inputsize0 = inputsize0;
@@ -161,7 +163,8 @@ public class OptimizedHybridHashJoinOperatorDescriptor
extends AbstractOperatorD
this.tuplePairComparatorFactoryProbe2Build =
tupPaircomparatorFactory01;
this.tuplePairComparatorFactoryBuild2Probe =
tupPaircomparatorFactory10;
outRecDescs[0] = recordDescriptor;
- this.predEvaluatorFactory = predEvaluatorFactory;
+ this.probePredEvalFactory = predEvalFactory0;
+ this.buildPredEvalFactory = predEvalFactory1;
this.isLeftOuter = isLeftOuter;
this.nonMatchWriterFactories = nonMatchWriterFactories;
}
@@ -171,10 +174,11 @@ public class OptimizedHybridHashJoinOperatorDescriptor
extends AbstractOperatorD
IBinaryHashFunctionFamily[] propHashFunctionFactories,
IBinaryHashFunctionFamily[] buildHashFunctionFactories,
RecordDescriptor recordDescriptor,
ITuplePairComparatorFactory tupPaircomparatorFactory01,
- ITuplePairComparatorFactory tupPaircomparatorFactory10,
IPredicateEvaluatorFactory predEvaluatorFactory) {
+ ITuplePairComparatorFactory tupPaircomparatorFactory10,
IPredicateEvaluatorFactory predEvalFactory0,
+ IPredicateEvaluatorFactory predEvalFactory1) {
this(spec, memSizeInFrames, inputsize0, factor, keys0, keys1,
propHashFunctionFactories,
buildHashFunctionFactories, recordDescriptor,
tupPaircomparatorFactory01, tupPaircomparatorFactory10,
- predEvaluatorFactory, false, null);
+ predEvalFactory0, predEvalFactory1, false, null);
}
@Override
@@ -268,8 +272,10 @@ public class OptimizedHybridHashJoinOperatorDescriptor
extends AbstractOperatorD
final RecordDescriptor buildRd =
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final RecordDescriptor probeRd =
recordDescProvider.getInputRecordDescriptor(probeAid, 0);
- final IPredicateEvaluator predEvaluator =
- (predEvaluatorFactory == null ? null :
predEvaluatorFactory.createPredicateEvaluator());
+ final IPredicateEvaluator buildPredEval =
+ (buildPredEvalFactory == null ? null :
buildPredEvalFactory.createPredicateEvaluator());
+ final IPredicateEvaluator probePredEval = (probePredEvalFactory ==
null || isLeftOuter ? null
+ : probePredEvalFactory.createPredicateEvaluator());
return new AbstractUnaryInputSinkOperatorNodePushable() {
private BuildAndPartitionTaskState state = new
BuildAndPartitionTaskState(
@@ -293,7 +299,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor
extends AbstractOperatorD
getNumberOfPartitions(state.memForJoin,
inputsize0, fudgeFactor, nPartitions);
state.hybridHJ = new
OptimizedHybridHashJoin(ctx.getJobletContext(), state.memForJoin,
state.numOfPartitions, PROBE_REL, BUILD_REL,
probeRd, buildRd, probeHpc, buildHpc,
- predEvaluator, isLeftOuter,
nonMatchWriterFactories);
+ probePredEval, buildPredEval, isLeftOuter,
nonMatchWriterFactories);
state.hybridHJ.initBuild();
if (LOGGER.isTraceEnabled()) {
@@ -366,8 +372,6 @@ public class OptimizedHybridHashJoinOperatorDescriptor
extends AbstractOperatorD
final RecordDescriptor probeRd =
recordDescProvider.getInputRecordDescriptor(getActivityId(), 0);
final ITuplePairComparator probComp =
tuplePairComparatorFactoryProbe2Build.createTuplePairComparator(ctx);
final ITuplePairComparator buildComp =
tuplePairComparatorFactoryBuild2Probe.createTuplePairComparator(ctx);
- final IPredicateEvaluator predEvaluator =
- predEvaluatorFactory == null ? null :
predEvaluatorFactory.createPredicateEvaluator();
final IMissingWriter[] nonMatchWriter =
isLeftOuter ? new
IMissingWriter[nonMatchWriterFactories.length] : null;
@@ -593,7 +597,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor
extends AbstractOperatorD
OptimizedHybridHashJoin rHHj;
int n = getNumberOfPartitions(state.memForJoin, tableSize,
fudgeFactor, nPartitions);
rHHj = new OptimizedHybridHashJoin(jobletCtx,
state.memForJoin, n, PROBE_REL, BUILD_REL, probeRd,
- buildRd, probeHpc, buildHpc, predEvaluator,
isLeftOuter, nonMatchWriterFactories); //checked-confirmed
+ buildRd, probeHpc, buildHpc, null, null,
isLeftOuter, nonMatchWriterFactories); //checked-confirmed
rHHj.setIsReversed(isReversed);
try {
@@ -748,7 +752,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor
extends AbstractOperatorD
ISerializableTable table = new
SerializableHashTable(tabSize, jobletCtx, bufferManager);
InMemoryHashJoin joiner = new InMemoryHashJoin(jobletCtx,
new FrameTupleAccessor(probeRDesc),
hpcRepProbe, new FrameTupleAccessor(buildRDesc),
buildRDesc, hpcRepBuild, isLeftOuter,
- nonMatchWriter, table, predEvaluator, isReversed,
bufferManager);
+ nonMatchWriter, table, isReversed, bufferManager);
joiner.setComparator(comp);
try {
bReader.open();
@@ -805,8 +809,7 @@ public class OptimizedHybridHashJoinOperatorDescriptor
extends AbstractOperatorD
boolean isReversed = outerRd == buildRd && innerRd ==
probeRd;
ITuplePairComparator nljComptorOuterInner = isReversed ?
buildComp : probComp;
NestedLoopJoin nlj = new NestedLoopJoin(jobletCtx, new
FrameTupleAccessor(outerRd),
- new FrameTupleAccessor(innerRd), memorySize,
predEvaluator, isLeftOuter, nonMatchWriter,
- isReversed);
+ new FrameTupleAccessor(innerRd), memorySize,
isLeftOuter, nonMatchWriter, isReversed);
nlj.setComparator(nljComptorOuterInner);
IFrame cacheBuff = new VSizeFrame(jobletCtx);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
index 93739df..9215856 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/OptimizedHybridHashJoinTest.java
@@ -26,7 +26,6 @@ import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksJobletContext;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFamily;
-import org.apache.hyracks.api.dataflow.value.IPredicateEvaluator;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITuplePairComparator;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionComputer;
@@ -61,7 +60,6 @@ public class OptimizedHybridHashJoinTest {
static RecordDescriptor buildRd;
static ITuplePartitionComputer probeHpc;
static ITuplePartitionComputer buildHpc;
- static IPredicateEvaluator predEval;
int memSizeInFrames = -1;
int numOfPartitions = -1;
boolean isLeftOuter = false;
@@ -151,7 +149,7 @@ public class OptimizedHybridHashJoinTest {
private void testJoin(int memSizeInFrames, int numOfPartitions, VSizeFrame
frame) throws HyracksDataException {
hhj = new OptimizedHybridHashJoin(ctx, memSizeInFrames,
numOfPartitions, probeRelName, buildRelName, probeRd,
- buildRd, probeHpc, buildHpc, predEval, isLeftOuter, null);
+ buildRd, probeHpc, buildHpc, null, null, isLeftOuter, null);
hhj.initBuild();
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
index a8bb1d5..296b682 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/integration/TPCHCustomerOrderHashJoinTest.java
@@ -133,7 +133,7 @@ public class TPCHCustomerOrderHashJoinTest extends
AbstractIntegrationTest {
new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
custOrderJoinDesc, 128,
- null, 128);
+ 128);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -176,12 +176,12 @@ public class TPCHCustomerOrderHashJoinTest extends
AbstractIntegrationTest {
new DelimitedDataTupleParserFactory(custValueParserFactories,
'|'), custDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
custScanner, NC1_ID);
- OptimizedHybridHashJoinOperatorDescriptor join =
- new OptimizedHybridHashJoinOperatorDescriptor(spec, 32, 20,
1.2, new int[] { 1 }, new int[] { 0 },
- new IBinaryHashFunctionFamily[] {
MurmurHash3BinaryHashFunctionFamily.INSTANCE },
- new IBinaryHashFunctionFamily[] {
MurmurHash3BinaryHashFunctionFamily.INSTANCE },
- custOrderJoinDesc, new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
- new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), null,
false, null);
+ OptimizedHybridHashJoinOperatorDescriptor join = new
OptimizedHybridHashJoinOperatorDescriptor(spec, 32, 20,
+ 1.2, new int[] { 1 }, new int[] { 0 },
+ new IBinaryHashFunctionFamily[] {
MurmurHash3BinaryHashFunctionFamily.INSTANCE },
+ new IBinaryHashFunctionFamily[] {
MurmurHash3BinaryHashFunctionFamily.INSTANCE }, custOrderJoinDesc,
+ new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
+ new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), null,
null, false, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID);
@@ -234,8 +234,8 @@ public class TPCHCustomerOrderHashJoinTest extends
AbstractIntegrationTest {
new int[] { 1 },
new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
- new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), null,
custOrderJoinDesc,
- true, nonMatchWriterFactories, 128, 128);
+ new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
custOrderJoinDesc, true,
+ nonMatchWriterFactories, 128, 128);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -288,7 +288,7 @@ public class TPCHCustomerOrderHashJoinTest extends
AbstractIntegrationTest {
new IBinaryHashFunctionFamily[] {
MurmurHash3BinaryHashFunctionFamily.INSTANCE },
new IBinaryHashFunctionFamily[] {
MurmurHash3BinaryHashFunctionFamily.INSTANCE },
custOrderJoinDesc, new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
- new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null,
true,
+ new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null,
null, true,
nonMatchWriterFactories);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID);
@@ -343,7 +343,7 @@ public class TPCHCustomerOrderHashJoinTest extends
AbstractIntegrationTest {
new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
custOrderJoinDesc, 128,
- null, 128);
+ 128);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -396,12 +396,12 @@ public class TPCHCustomerOrderHashJoinTest extends
AbstractIntegrationTest {
new DelimitedDataTupleParserFactory(custValueParserFactories,
'|'), custDesc);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec,
custScanner, NC1_ID, NC2_ID);
- OptimizedHybridHashJoinOperatorDescriptor join =
- new OptimizedHybridHashJoinOperatorDescriptor(spec, 5, 20,
1.2, new int[] { 1 }, new int[] { 0 },
- new IBinaryHashFunctionFamily[] {
MurmurHash3BinaryHashFunctionFamily.INSTANCE },
- new IBinaryHashFunctionFamily[] {
MurmurHash3BinaryHashFunctionFamily.INSTANCE },
- custOrderJoinDesc, new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
- new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), null,
false, null);
+ OptimizedHybridHashJoinOperatorDescriptor join = new
OptimizedHybridHashJoinOperatorDescriptor(spec, 5, 20, 1.2,
+ new int[] { 1 }, new int[] { 0 },
+ new IBinaryHashFunctionFamily[] {
MurmurHash3BinaryHashFunctionFamily.INSTANCE },
+ new IBinaryHashFunctionFamily[] {
MurmurHash3BinaryHashFunctionFamily.INSTANCE }, custOrderJoinDesc,
+ new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
+ new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1), null,
null, false, null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID, NC2_ID);
@@ -460,7 +460,7 @@ public class TPCHCustomerOrderHashJoinTest extends
AbstractIntegrationTest {
new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
custOrderJoinDesc, 128,
- null, 128);
+ 128);
PartitionConstraintHelper.addPartitionCountConstraint(spec, join, 2);
ResultSetId rsId = new ResultSetId(1);
@@ -524,7 +524,7 @@ public class TPCHCustomerOrderHashJoinTest extends
AbstractIntegrationTest {
new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0),
custOrderJoinDesc, 128,
- null, 128);
+ 128);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID, NC2_ID);
ResultSetId rsId = new ResultSetId(1);
@@ -581,7 +581,7 @@ public class TPCHCustomerOrderHashJoinTest extends
AbstractIntegrationTest {
new IBinaryHashFunctionFamily[] {
UTF8StringBinaryHashFunctionFamily.INSTANCE },
new IBinaryHashFunctionFamily[] {
UTF8StringBinaryHashFunctionFamily.INSTANCE },
custOrderJoinDesc, new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
- new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null);
+ new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null,
null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID);
@@ -629,7 +629,7 @@ public class TPCHCustomerOrderHashJoinTest extends
AbstractIntegrationTest {
new IBinaryHashFunctionFamily[] {
UTF8StringBinaryHashFunctionFamily.INSTANCE },
new IBinaryHashFunctionFamily[] {
UTF8StringBinaryHashFunctionFamily.INSTANCE },
custOrderJoinDesc, new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
- new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null);
+ new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null,
null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID);
@@ -678,7 +678,7 @@ public class TPCHCustomerOrderHashJoinTest extends
AbstractIntegrationTest {
new IBinaryHashFunctionFamily[] {
UTF8StringBinaryHashFunctionFamily.INSTANCE },
new IBinaryHashFunctionFamily[] {
UTF8StringBinaryHashFunctionFamily.INSTANCE },
custOrderJoinDesc, new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
- new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null);
+ new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null,
null);
PartitionConstraintHelper.addAbsoluteLocationConstraint(spec, join,
NC1_ID);
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
index 3bbeca8..a351c85 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/tpch-example/tpchclient/src/main/java/org/apache/hyracks/examples/tpch/client/Join.java
@@ -185,7 +185,7 @@ public class Join {
new IBinaryHashFunctionFactory[] {
PointableBinaryHashFunctionFactory.of(UTF8StringPointable.FACTORY) },
new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
- Common.custOrderJoinDesc, tableSize, null, memSize *
frameSize);
+ Common.custOrderJoinDesc, tableSize, memSize * frameSize);
} else if ("hybrid".equalsIgnoreCase(algo)) {
join = new OptimizedHybridHashJoinOperatorDescriptor(spec,
memSize, graceInputSize, graceFactor,
@@ -194,7 +194,7 @@ public class Join {
new IBinaryHashFunctionFamily[] {
UTF8StringBinaryHashFunctionFamily.INSTANCE },
Common.custOrderJoinDesc,
new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 0, 1),
- new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null);
+ new
JoinComparatorFactory(UTF8StringBinaryComparatorFactory.INSTANCE, 1, 0), null,
null);
} else {
System.err.println("unknown algorithm:" + algo);