Repository: tajo Updated Branches: refs/heads/master 326be451d -> 72808e06f
TAJO-968: Self-Join query (including partitioned table) doesn't run unexpectedly using auto broad cast join. (jaehwa) Closes #88 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/72808e06 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/72808e06 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/72808e06 Branch: refs/heads/master Commit: 72808e06f02cbb0bd7d9cf345544c60205cf34b0 Parents: 326be45 Author: blrunner <[email protected]> Authored: Thu Jul 24 11:27:18 2014 +0900 Committer: blrunner <[email protected]> Committed: Thu Jul 24 11:27:18 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ .../apache/tajo/worker/TaskAttemptContext.java | 26 +++++++++++- .../tajo/engine/query/TestJoinBroadcast.java | 42 +++++++++++++++++--- 3 files changed, 64 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/72808e06/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index a67625d..08cf60a 100644 --- a/CHANGES +++ b/CHANGES @@ -97,6 +97,9 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-968: Self-Join query (including partitioned table) doesn't run unexpectedly + using auto broadcast join. (jaewha) + TAJO-914: join queries with constant values can cause schema mismatch in logical plan. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/72808e06/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 1f0c410..db4af45 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -31,8 +31,10 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.util.TUtil; import java.io.File; import java.util.*; @@ -234,10 +236,30 @@ public class TaskAttemptContext { tableFragments = new ArrayList<FragmentProto>(); } + List<Path> paths = fragmentToPath(tableFragments); + for (FragmentProto eachFragment: fragments) { - tableFragments.add(eachFragment); + FileFragment fileFragment = FragmentConvertor.convert(FileFragment.class, eachFragment); + // If current attempt already has same path, we don't need to add it to fragments. + if (!paths.contains(fileFragment.getPath())) { + tableFragments.add(eachFragment); + } } - fragmentMap.put(tableId, tableFragments); + + if (tableFragments.size() > 0) { + fragmentMap.put(tableId, tableFragments); + } + } + + private List<Path> fragmentToPath(List<FragmentProto> tableFragments) { + List<Path> list = TUtil.newList(); + + for (FragmentProto proto : tableFragments) { + FileFragment fragment = FragmentConvertor.convert(FileFragment.class, proto); + list.add(fragment.getPath()); + } + + return list; } public Path getWorkDir() { http://git-wip-us.apache.org/repos/asf/tajo/blob/72808e06/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java index 9cc65bc..5df6f24 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java @@ -18,8 +18,6 @@ package org.apache.tajo.engine.query; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.*; import org.apache.tajo.catalog.*; @@ -43,13 +41,11 @@ import org.junit.experimental.categories.Category; import java.io.File; import java.sql.ResultSet; -import static junit.framework.TestCase.*; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; -import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.*; @Category(IntegrationTest.class) public class TestJoinBroadcast extends QueryTestCaseBase { - private static final Log LOG = LogFactory.getLog(TestJoinBroadcast.class); public TestJoinBroadcast() throws Exception { super(TajoConstants.DEFAULT_DATABASE_NAME); testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.DIST_QUERY_BROADCAST_JOIN_AUTO.varname, "true"); @@ -615,4 +611,40 @@ public class TestJoinBroadcast extends QueryTestCaseBase { executeString("DROP TABLE table_large PURGE").close(); } } + + + @Test + public final void testSelfJoin() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("paritioned_nation"); + ResultSet res = executeString( + "create table " + tableName + " (n_name text," + + " n_comment text, n_regionkey int8) USING csv " + + "WITH ('csvfile.delimiter'='|')" + + "PARTITION BY column(n_nationkey int8)"); + res.close(); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + + res = executeString( + "insert overwrite into " + tableName + + " select n_name, n_comment, n_regionkey, n_nationkey from nation"); + res.close(); + + res = executeString( + "select a.n_nationkey, a.n_name from nation a join nation b on a.n_nationkey = b.n_nationkey" + + " where a.n_nationkey in (1)"); + String expected = resultSetToString(res); + res.close(); + + res = executeString( + "select a.n_nationkey, a.n_name from " + tableName + " a join "+tableName + + " b on a.n_nationkey = b.n_nationkey " + + " where a.n_nationkey in (1)"); + String resultSetData = resultSetToString(res); + res.close(); + + assertEquals(expected, resultSetData); + + } + + }
