TAJO-879: Some data is missing in the case of BROADCAST JOIN and multi-column partition. (Hyoungjun Kim via jaehwa)
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8883f9fc Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8883f9fc Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8883f9fc Branch: refs/heads/window_function Commit: 8883f9fc28a51aa9db6242206f75db49043e176b Parents: 1613cd2 Author: blrunner <[email protected]> Authored: Mon Jun 23 14:13:54 2014 +0900 Committer: blrunner <[email protected]> Committed: Mon Jun 23 14:13:54 2014 +0900 ---------------------------------------------------------------------- CHANGES | 3 ++ .../engine/planner/physical/SeqScanExec.java | 4 +-- .../tajo/master/querymaster/SubQuery.java | 1 + .../tajo/engine/query/TestJoinBroadcast.java | 36 +++++++++++++++----- ...estBroadcastMultiColumnPartitionTable.result | 5 +++ 5 files changed, 39 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/8883f9fc/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 74f016b..3f3c9c9 100644 --- a/CHANGES +++ b/CHANGES @@ -71,6 +71,9 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-879: Some data is missing in the case of BROADCAST JOIN and multi-column partition. + (Hyoungjun Kim via jaehwa) + TAJO-848: PreLogicalPlanVerifier::visitInsert need to find smaller expressions than target columns for a partitioned table. (jaehwa) http://git-wip-us.apache.org/repos/asf/tajo/blob/8883f9fc/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 0a2b279..a45cd7b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -78,9 +78,9 @@ public class SeqScanExec extends PhysicalExec { String pathNameKey = ""; if (fragments != null) { for (FragmentProto f : fragments) { - FileFragment fileFragement = (FileFragment) FragmentConvertor.convert( + FileFragment fileFragement = FragmentConvertor.convert( context.getConf(), plan.getTableDesc().getMeta().getStoreType(), f); - pathNameKey += fileFragement.getPath().getParent().getName(); + pathNameKey += fileFragement.getPath(); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/8883f9fc/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 22817bd..be0c624 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -907,6 +907,7 @@ public class SubQuery implements EventHandler<SubQueryEvent> { // Otherwise, it creates at least one fragments for a table, which may // span a number of blocks or possibly consists of a number of files. if (scan.getType() == NodeType.PARTITIONS_SCAN) { + // After calling this method, partition paths are removed from the physical plan. fragments = Repartitioner.getFragmentsFromPartitionedTable(subQuery.getStorageManager(), scan, table); } else { Path inputPath = table.getPath(); http://git-wip-us.apache.org/repos/asf/tajo/blob/8883f9fc/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java index ae87c64..2e3b899 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java @@ -21,13 +21,8 @@ package org.apache.tajo.engine.query; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; -import org.apache.tajo.IntegrationTest; -import org.apache.tajo.QueryId; -import org.apache.tajo.QueryTestCaseBase; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.*; +import org.apache.tajo.catalog.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.Int4Datum; @@ -50,6 +45,7 @@ import java.io.File; import java.sql.ResultSet; import static junit.framework.TestCase.*; +import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.junit.Assert.assertNotNull; @Category(IntegrationTest.class) @@ -391,7 +387,6 @@ public class TestJoinBroadcast extends QueryTestCaseBase { @Test public final void testBroadcastPartitionTable() throws Exception { - // https://issues.apache.org/jira/browse/TAJO-839 // If all tables participate in the BROADCAST JOIN, there is some missing data. executeDDL("customer_partition_ddl.sql", null); ResultSet res = executeFile("insert_into_customer_partition.sql"); @@ -427,6 +422,31 @@ public class TestJoinBroadcast extends QueryTestCaseBase { executeString("DROP TABLE orders_multifile PURGE"); } + @Test + public final void testBroadcastMultiColumnPartitionTable() throws Exception { + String tableName = CatalogUtil.normalizeIdentifier("testBroadcastMultiColumnPartitionTable"); + ResultSet res = testBase.execute( + "create table " + tableName + " (col1 int4, col2 float4) partition by column(col3 text, col4 text) "); + res.close(); + TajoTestingCluster cluster = testBase.getTestingCluster(); + CatalogService catalog = cluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable(DEFAULT_DATABASE_NAME, tableName)); + + res = executeString("insert overwrite into " + tableName + + " select o_orderkey, o_totalprice, substr(o_orderdate, 6, 2), substr(o_orderdate, 1, 4) from orders"); + res.close(); + + res = executeString( + "select distinct a.col3 from " + tableName + " as a " + + "left outer join lineitem_large b " + + "on a.col1 = b.l_orderkey" + ); + + assertResultSet(res); + cleanupQuery(res); + } + + static interface TupleCreator { public Tuple createTuple(String[] columnDatas); } http://git-wip-us.apache.org/repos/asf/tajo/blob/8883f9fc/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastMultiColumnPartitionTable.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastMultiColumnPartitionTable.result b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastMultiColumnPartitionTable.result new file mode 100644 index 0000000..df3c7bc --- /dev/null +++ b/tajo-core/src/test/resources/results/TestJoinBroadcast/testBroadcastMultiColumnPartitionTable.result @@ -0,0 +1,5 @@ +col3 +------------------------------- +01 +10 +12 \ No newline at end of file
