TAJO-1101: Broadcast join with a zero-length file table returns wrong result data.
Closes #184 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/2eba8aa3 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/2eba8aa3 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/2eba8aa3 Branch: refs/heads/block_iteration Commit: 2eba8aa30e1266e15ad918e4b150259bff6b31df Parents: d0f9ebc Author: HyoungJun Kim <[email protected]> Authored: Wed Oct 8 08:58:18 2014 +0900 Committer: HyoungJun Kim <[email protected]> Committed: Wed Oct 8 08:58:18 2014 +0900 ---------------------------------------------------------------------- CHANGES | 2 ++ .../tajo/engine/query/TestJoinBroadcast.java | 32 ++++++++++++++++++++ ...tMultipleBroadcastDataFileWithZeroLength.sql | 3 ++ ...ltipleBroadcastDataFileWithZeroLength.result | 2 ++ .../org/apache/tajo/storage/MergeScanner.java | 18 +++++++---- 5 files changed, 51 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/2eba8aa3/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index eb90021..de21976 100644 --- a/CHANGES +++ b/CHANGES @@ -161,6 +161,8 @@ Release 0.9.0 - unreleased BUG FIXES + TAJO-1101: Broadcast join with a zero-length file table returns wrong result data.(Hyoungjun Kim) + TAJO-1067: INSERT OVERWRITE INTO should not remove all partitions. (jaehwa) TAJO-1065: The \admin -cluster argument doesn't run as expected. http://git-wip-us.apache.org/repos/asf/tajo/blob/2eba8aa3/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java index 2625136..768d5aa 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestJoinBroadcast.java @@ -18,6 +18,7 @@ package org.apache.tajo.engine.query; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.*; import org.apache.tajo.catalog.*; @@ -39,6 +40,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import java.io.File; +import java.io.OutputStream; import java.sql.ResultSet; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; @@ -646,5 +648,35 @@ public class TestJoinBroadcast extends QueryTestCaseBase { } + @Test + public void testMultipleBroadcastDataFileWithZeroLength() throws Exception { + createMultiFile("nation", 2, new TupleCreator() { + public Tuple createTuple(String[] columnDatas) { + return new VTuple(new Datum[]{ + new Int4Datum(Integer.parseInt(columnDatas[0])), + new TextDatum(columnDatas[1]), + new Int4Datum(Integer.parseInt(columnDatas[2])), + new TextDatum(columnDatas[3]) + }); + } + }); + addEmptyDataFile("nation"); + ResultSet res = executeQuery(); + + assertResultSet(res); + cleanupQuery(res); + + executeString("DROP TABLE nation_multifile PURGE"); + } + + private void addEmptyDataFile(String tableName) throws Exception { + String multiTableName = tableName + "_multifile"; + TableDesc table = client.getTableDesc(multiTableName); + + Path dataPath = new Path(table.getPath(), 999999 + "_empty.csv"); + FileSystem fs = dataPath.getFileSystem(conf); + OutputStream out = fs.create(dataPath); + out.close(); + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/2eba8aa3/tajo-core/src/test/resources/queries/TestJoinBroadcast/testMultipleBroadcastDataFileWithZeroLength.sql ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/queries/TestJoinBroadcast/testMultipleBroadcastDataFileWithZeroLength.sql b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testMultipleBroadcastDataFileWithZeroLength.sql new file mode 100644 index 0000000..83ddf34 --- /dev/null +++ b/tajo-core/src/test/resources/queries/TestJoinBroadcast/testMultipleBroadcastDataFileWithZeroLength.sql @@ -0,0 +1,3 @@ +select * from customer_large a + left outer join nation_multifile b on a.c_nationkey = b.n_nationkey + where b.n_nationkey is null \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/2eba8aa3/tajo-core/src/test/resources/results/TestJoinBroadcast/testMultipleBroadcastDataFileWithZeroLength.result ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/resources/results/TestJoinBroadcast/testMultipleBroadcastDataFileWithZeroLength.result b/tajo-core/src/test/resources/results/TestJoinBroadcast/testMultipleBroadcastDataFileWithZeroLength.result new file mode 100644 index 0000000..a9ffa3b --- /dev/null +++ b/tajo-core/src/test/resources/results/TestJoinBroadcast/testMultipleBroadcastDataFileWithZeroLength.result @@ -0,0 +1,2 @@ +c_custkey,c_name,c_address,c_nationkey,c_phone,c_acctbal,c_mktsegment,c_comment,n_nationkey,n_name,n_regionkey,n_comment +------------------------------- \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/2eba8aa3/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java index 8917f21..637df2c 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java @@ -29,6 +29,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.storage.fragment.FileFragment; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -60,8 +61,17 @@ public class MergeScanner implements Scanner { this.meta = meta; this.target = target; + this.fragments = new ArrayList<FileFragment>(); + + long numBytes = 0; + for (FileFragment eachFileFragment: rawFragmentList) { + numBytes += eachFileFragment.getEndKey(); + if (eachFileFragment.getEndKey() > 0) { + fragments.add(eachFileFragment); + } + } + // it should keep the input order. Otherwise, it causes wrong result of sort queries. - this.fragments = ImmutableList.copyOf(rawFragmentList); this.reset(); if (currentScanner != null) { @@ -70,13 +80,9 @@ public class MergeScanner implements Scanner { } tableStats = new TableStats(); - long numBytes = 0; - for (FileFragment eachFileFragment: rawFragmentList) { - numBytes += (eachFileFragment.getEndKey() - eachFileFragment.getStartKey()); - } tableStats.setNumBytes(numBytes); - tableStats.setNumBlocks(rawFragmentList.size()); + tableStats.setNumBlocks(fragments.size()); for(Column eachColumn: schema.getColumns()) { ColumnStats columnStats = new ColumnStats(eachColumn);
