Repository: tajo Updated Branches: refs/heads/master ea5ce54d8 -> f3d63b46b
http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java index a184a9a..e4d8b0b 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/planner/physical/TestPhysicalPlanner.java @@ -25,15 +25,13 @@ import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.tajo.LocalTajoTestingUtility; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.*; import org.apache.tajo.algebra.Expr; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.datum.Datum; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; @@ -64,6 +62,7 @@ import org.junit.Test; import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; @@ -534,9 +533,8 @@ public class TestPhysicalPlanner { FileFragment[] frags = StorageManager.splitNG(conf, "default.score", score.getMeta(), score.getPath(), Integer.MAX_VALUE); QueryUnitAttemptId id = LocalTajoTestingUtility.newQueryUnitAttemptId(masterPlan); - Path workDir = CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlan"); - TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), - id, new FileFragment[] { frags[0] }, workDir); + TaskAttemptContext ctx = new TaskAttemptContext(new QueryContext(conf), id, new FileFragment[] { frags[0] }, + CommonTestingUtil.getTestDir("target/test-data/testPartitionedStorePlan")); ctx.setEnforcer(new Enforcer()); Expr context = analyzer.parse(QUERIES[7]); LogicalPlan plan = planner.createPlan(defaultContext, context); @@ -553,27 +551,35 @@ public class TestPhysicalPlanner { TableMeta outputMeta = CatalogUtil.newTableMeta(dataChannel.getStoreType()); FileSystem fs = sm.getFileSystem(); + QueryId queryId = id.getQueryUnitId().getExecutionBlockId().getQueryId(); + ExecutionBlockId ebId = id.getQueryUnitId().getExecutionBlockId(); PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); exec.next(); exec.close(); + ctx.getHashShuffleAppenderManager().close(ebId); - Path path = new Path(workDir, "output"); - FileStatus [] list = fs.listStatus(path); - assertEquals(numPartitions, list.length); + String executionBlockBaseDir = queryId.toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; + Path queryLocalTmpDir = new Path(conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) + "/" + executionBlockBaseDir); + FileStatus [] list = fs.listStatus(queryLocalTmpDir); - FileFragment[] fragments = new FileFragment[list.length]; - int i = 0; + List<FileFragment> fragments = new ArrayList<FileFragment>(); for (FileStatus status : list) { - fragments[i++] = new FileFragment("partition", status.getPath(), 0, status.getLen()); + assertTrue(status.isDirectory()); + FileStatus [] files = fs.listStatus(status.getPath()); + for (FileStatus eachFile: files) { + fragments.add(new FileFragment("partition", eachFile.getPath(), 0, eachFile.getLen())); + } } + + assertEquals(numPartitions, fragments.size()); Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments)); scanner.init(); Tuple tuple; - i = 0; + int i = 0; while ((tuple = scanner.next()) != null) { assertEquals(6, tuple.get(2).asInt4()); // sum assertEquals(3, tuple.get(3).asInt4()); // max @@ -585,6 +591,8 @@ public class TestPhysicalPlanner { // Examine the statistics information assertEquals(10, ctx.getResultStats().getNumRows().longValue()); + + fs.delete(queryLocalTmpDir, true); } @Test @@ -611,27 +619,36 @@ public class TestPhysicalPlanner { TableMeta outputMeta = CatalogUtil.newTableMeta(dataChannel.getStoreType()); + FileSystem fs = sm.getFileSystem(); + QueryId queryId = id.getQueryUnitId().getExecutionBlockId().getQueryId(); + ExecutionBlockId ebId = id.getQueryUnitId().getExecutionBlockId(); + PhysicalPlanner phyPlanner = new PhysicalPlannerImpl(conf,sm); PhysicalExec exec = phyPlanner.createPlan(ctx, rootNode); exec.init(); exec.next(); exec.close(); + ctx.getHashShuffleAppenderManager().close(ebId); - Path path = new Path(workDir, "output"); - FileSystem fs = sm.getFileSystem(); - - FileStatus [] list = fs.listStatus(path); - assertEquals(numPartitions, list.length); + String executionBlockBaseDir = queryId.toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; + Path queryLocalTmpDir = new Path(conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) + "/" + executionBlockBaseDir); + FileStatus [] list = fs.listStatus(queryLocalTmpDir); - FileFragment[] fragments = new FileFragment[list.length]; - int i = 0; + List<FileFragment> fragments = new ArrayList<FileFragment>(); for (FileStatus status : list) { - fragments[i++] = new FileFragment("partition", status.getPath(), 0, status.getLen()); + assertTrue(status.isDirectory()); + FileStatus [] files = fs.listStatus(status.getPath()); + for (FileStatus eachFile: files) { + fragments.add(new FileFragment("partition", eachFile.getPath(), 0, eachFile.getLen())); + } } + + assertEquals(numPartitions, fragments.size()); + Scanner scanner = new MergeScanner(conf, rootNode.getOutSchema(), outputMeta, TUtil.newList(fragments)); scanner.init(); Tuple tuple; - i = 0; + int i = 0; while ((tuple = scanner.next()) != null) { assertEquals(60, tuple.get(0).asInt4()); // sum assertEquals(3, tuple.get(1).asInt4()); // max @@ -643,6 +660,7 @@ public class TestPhysicalPlanner { // Examine the statistics information assertEquals(1, ctx.getResultStats().getNumRows().longValue()); + fs.delete(queryLocalTmpDir, true); } @Test http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java index ffb1915..fccec26 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestGroupByQuery.java @@ -687,12 +687,8 @@ public class TestGroupByQuery extends QueryTestCaseBase { assertTrue(!subQueries.isEmpty()); for (SubQuery subQuery: subQueries) { if (subQuery.getId().toStringNoPrefix().endsWith("_000001")) { - QueryUnit[] queryUnits = subQuery.getQueryUnits(); - assertNotNull(queryUnits); - for (QueryUnit eachQueryUnit: queryUnits) { - for (ShuffleFileOutput output: eachQueryUnit.getShuffleFileOutputs()) { - partitionIds.add(output.getPartId()); - } + for (QueryUnit.IntermediateEntry eachInterm: subQuery.getHashShuffleIntermediateEntries()) { + partitionIds.add(eachInterm.getPartId()); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index d80fdb5..5bf2944 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -31,25 +31,36 @@ import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.planner.logical.NodeType; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.jdbc.TajoResultSet; +import org.apache.tajo.master.querymaster.Query; import org.apache.tajo.master.querymaster.QueryMasterTask; +import org.apache.tajo.master.querymaster.QueryUnit; +import org.apache.tajo.master.querymaster.SubQuery; +import org.apache.tajo.storage.StorageConstants; +import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.worker.TajoWorker; import org.junit.Test; import java.io.IOException; import java.sql.ResultSet; +import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Random; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType.SCATTERED_HASH_SHUFFLE; import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; public class TestTablePartitions extends QueryTestCaseBase { @@ -788,6 +799,67 @@ public class TestTablePartitions extends QueryTestCaseBase { } @Test + public void testScatteredHashShuffle() throws Exception { + testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.varname, "2"); + testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME.varname, "1"); + try { + KeyValueSet tableOptions = new KeyValueSet(); + tableOptions.set(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); + tableOptions.set(StorageConstants.CSVFILE_NULL, "\\\\N"); + + Schema schema = new Schema(); + schema.addColumn("col1", TajoDataTypes.Type.TEXT); + schema.addColumn("col2", TajoDataTypes.Type.TEXT); + + List<String> data = new ArrayList<String>(); + int totalBytes = 0; + Random rand = new Random(System.currentTimeMillis()); + String col2Data = "Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2" + + "Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2" + + "Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2Column-2"; + + int index = 0; + while(true) { + int col1RandomValue = 1; + String str = col1RandomValue + "|col2-" + index + "-" + col2Data; + data.add(str); + + totalBytes += str.getBytes().length; + + if (totalBytes > 4 * 1024 * 1024) { + break; + } + index++; + } + + TajoTestingCluster.createTable("testscatteredhashshuffle", schema, tableOptions, data.toArray(new String[]{}), 3); + CatalogService catalog = testingCluster.getMaster().getCatalog(); + assertTrue(catalog.existsTable("default", "testscatteredhashshuffle")); + + executeString("create table test_partition (col2 text) partition by column (col1 text)").close(); + executeString("insert into test_partition select col2, col1 from testscatteredhashshuffle").close(); + + ResultSet res = executeString("select col1 from test_partition"); + + int numRows = 0; + while (res.next()) { + numRows++; + } + assertEquals(data.size(), numRows); + + // assert data file size + + } finally { + testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.varname, + TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.defaultVal); + testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME.varname, + TajoConf.ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME.defaultVal); + executeString("DROP TABLE test_partition PURGE").close(); + executeString("DROP TABLE testScatteredHashShuffle PURGE").close(); + } + } + + @Test public final void TestSpecialCharPartitionKeys1() throws Exception { // See - TAJO-947: ColPartitionStoreExec can cause URISyntaxException due to special characters. http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java index 29aeccd..f969a08 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java @@ -26,9 +26,9 @@ import org.apache.tajo.QueryId; import org.apache.tajo.TestTajoIds; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.querymaster.QueryUnit; +import org.apache.tajo.master.querymaster.QueryUnit.IntermediateEntry; import org.apache.tajo.master.querymaster.Repartitioner; import org.apache.tajo.util.Pair; -import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.FetchImpl; import org.jboss.netty.handler.codec.http.QueryStringDecoder; import org.junit.Test; @@ -48,45 +48,62 @@ public class TestRepartitioner { String hostName = "tajo1"; int port = 1234; ExecutionBlockId sid = new ExecutionBlockId(q1, 2); - int partitionId = 2; + int numPartition = 10; - List<QueryUnit.IntermediateEntry> intermediateEntries = TUtil.newList(); + Map<Integer, List<IntermediateEntry>> intermediateEntries = new HashMap<Integer, List<IntermediateEntry>>(); + for (int i = 0; i < numPartition; i++) { + intermediateEntries.put(i, new ArrayList<IntermediateEntry>()); + } for (int i = 0; i < 1000; i++) { - intermediateEntries.add(new QueryUnit.IntermediateEntry(i, 0, partitionId, new QueryUnit.PullHost(hostName, port))); + int partitionId = i % numPartition; + IntermediateEntry entry = new IntermediateEntry(i, 0, partitionId, new QueryUnit.PullHost(hostName, port)); + entry.setEbId(sid); + entry.setVolume(10); + intermediateEntries.get(partitionId).add(entry); } - FetchImpl fetch = new FetchImpl(new QueryUnit.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE, - sid, partitionId, intermediateEntries); + Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> hashEntries = + new HashMap<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>>(); + + for (Map.Entry<Integer, List<IntermediateEntry>> eachEntry: intermediateEntries.entrySet()) { + FetchImpl fetch = new FetchImpl(new QueryUnit.PullHost(hostName, port), TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE, + sid, eachEntry.getKey(), eachEntry.getValue()); + + fetch.setName(sid.toString()); + + TajoWorkerProtocol.FetchProto proto = fetch.getProto(); + fetch = new FetchImpl(proto); + assertEquals(proto, fetch.getProto()); - fetch.setName(sid.toString()); + Map<ExecutionBlockId, List<IntermediateEntry>> ebEntries = new HashMap<ExecutionBlockId, List<IntermediateEntry>>(); + ebEntries.put(sid, eachEntry.getValue()); - TajoWorkerProtocol.FetchProto proto = fetch.getProto(); - fetch = new FetchImpl(proto); - assertEquals(proto, fetch.getProto()); - List<URI> uris = fetch.getURIs(); + hashEntries.put(eachEntry.getKey(), ebEntries); - List<String> taList = TUtil.newList(); - for (URI uri : uris) { + List<URI> uris = fetch.getURIs(); + assertEquals(1, uris.size()); //In Hash Suffle, Fetcher return only one URI per partition. + + URI uri = uris.get(0); final Map<String, List<String>> params = new QueryStringDecoder(uri).getParameters(); - taList.addAll(splitMaps(params.get("ta"))); - } - int checkTaskId = 0; - for (String ta : taList) { - assertEquals(checkTaskId++, Integer.parseInt(ta.split("_")[0])); + assertEquals(eachEntry.getKey().toString(), params.get("p").get(0)); + assertEquals("h", params.get("type").get(0)); + assertEquals("" + sid.getId(), params.get("sid").get(0)); } - } - private List<String> splitMaps(List<String> mapq) { - if (null == mapq) { - return null; - } - final List<String> ret = new ArrayList<String>(); - for (String s : mapq) { - Collections.addAll(ret, s.split(",")); + Map<Integer, Map<ExecutionBlockId, List<IntermediateEntry>>> mergedHashEntries = + Repartitioner.mergeIntermediateByPullHost(hashEntries); + + assertEquals(numPartition, mergedHashEntries.size()); + for (int i = 0; i < numPartition; i++) { + Map<ExecutionBlockId, List<IntermediateEntry>> eachEntry = mergedHashEntries.get(0); + assertEquals(1, eachEntry.size()); + List<IntermediateEntry> interEntry = eachEntry.get(sid); + assertEquals(1, interEntry.size()); + + assertEquals(1000, interEntry.get(0).getVolume()); } - return ret; } @Test @@ -148,6 +165,244 @@ public class TestRepartitioner { } } + @Test + public void testMergeIntermediates() { + //Test Merge + List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>(); + + int[] pageLengths = {10 * 1024 * 1024, 10 * 1024 * 1024, 10 * 1024 * 1024, 5 * 1024 * 1024}; //35 MB + long expectedTotalLength = 0; + for (int i = 0; i < 20; i++) { + List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + long offset = 0; + for (int j = 0; j < pageLengths.length; j++) { + pages.add(new Pair(offset, pageLengths[j])); + offset += pageLengths[j]; + expectedTotalLength += pageLengths[j]; + } + IntermediateEntry interm = new IntermediateEntry(i, -1, -1, new QueryUnit.PullHost("" + i, i)); + interm.setPages(pages); + interm.setVolume(offset); + intermediateEntries.add(interm); + } + + long splitVolume = 128 * 1024 * 1024; + List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries, + splitVolume, 10 * 1024 * 1024); + assertEquals(6, fetches.size()); + + int totalInterms = 0; + int index = 0; + int numZeroPosFetcher = 0; + long totalLength = 0; + for (List<FetchImpl> eachFetchList: fetches) { + totalInterms += eachFetchList.size(); + long eachFetchVolume = 0; + for (FetchImpl eachFetch: eachFetchList) { + eachFetchVolume += eachFetch.getLength(); + if (eachFetch.getOffset() == 0) { + numZeroPosFetcher++; + } + totalLength += eachFetch.getLength(); + } + assertTrue(eachFetchVolume + " should be smaller than splitVolume", eachFetchVolume < splitVolume); + if (index < fetches.size() - 1) { + assertTrue(eachFetchVolume + " should be great than 100MB", eachFetchVolume >= 100 * 1024 * 1024); + } + index++; + } + assertEquals(23, totalInterms); + assertEquals(20, numZeroPosFetcher); + assertEquals(expectedTotalLength, totalLength); + } + + @Test + public void testSplitIntermediates() { + List<IntermediateEntry> intermediateEntries = new ArrayList<IntermediateEntry>(); + + int[] pageLengths = new int[20]; //195MB + for (int i = 0 ; i < pageLengths.length; i++) { + if (i < pageLengths.length - 1) { + pageLengths[i] = 10 * 1024 * 1024; + } else { + pageLengths[i] = 5 * 1024 * 1024; + } + } + + long expectedTotalLength = 0; + for (int i = 0; i < 20; i++) { + List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + long offset = 0; + for (int j = 0; j < pageLengths.length; j++) { + pages.add(new Pair(offset, pageLengths[j])); + offset += pageLengths[j]; + expectedTotalLength += pageLengths[j]; + } + IntermediateEntry interm = new IntermediateEntry(i, -1, 0, new QueryUnit.PullHost("" + i, i)); + interm.setPages(pages); + interm.setVolume(offset); + intermediateEntries.add(interm); + } + + long splitVolume = 128 * 1024 * 1024; + List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries, + splitVolume, 10 * 1024 * 1024); + assertEquals(32, fetches.size()); + + int index = 0; + int numZeroPosFetcher = 0; + long totalLength = 0; + Set<String> uniqPullHost = new HashSet<String>(); + + for (List<FetchImpl> eachFetchList: fetches) { + long length = 0; + for (FetchImpl eachFetch: eachFetchList) { + if (eachFetch.getOffset() == 0) { + numZeroPosFetcher++; + } + totalLength += eachFetch.getLength(); + length += eachFetch.getLength(); + uniqPullHost.add(eachFetch.getPullHost().toString()); + } + assertTrue(length + " should be smaller than splitVolume", length < splitVolume); + if (index < fetches.size() - 1) { + assertTrue(length + " should be great than 100MB" + fetches.size() + "," + index, length >= 100 * 1024 * 1024); + } + index++; + } + assertEquals(20, numZeroPosFetcher); + assertEquals(20, uniqPullHost.size()); + assertEquals(expectedTotalLength, totalLength); + } + + @Test + public void testSplitIntermediates2() { + long[][] pageDatas = { + {0, 10538717}, + {10538717, 10515884}, + {21054601, 10514343}, + {31568944, 10493988}, + {42062932, 10560639}, + {52623571, 10548486}, + {63172057, 10537811}, + {73709868, 10571060}, + {84280928, 10515062}, + {94795990, 10502964}, + {105298954, 10514011}, + {115812965, 10532154}, + {126345119, 10534133}, + {136879252, 10549749}, + {147429001, 10566547}, + {157995548, 10543700}, + {168539248, 10490324}, + {179029572, 10500720}, + {189530292, 10505425}, + {200035717, 10548418}, + {210584135, 10562887}, + {221147022, 10554967}, + {231701989, 10507297}, + {242209286, 10515612}, + {252724898, 10491274}, + {263216172, 10512956}, + {273729128, 10490736}, + {284219864, 10501878}, + {294721742, 10564568}, + {305286310, 10488896}, + {315775206, 10516308}, + {326291514, 10517965}, + {336809479, 10487038}, + {347296517, 10603472}, + {357899989, 10507330}, + {368407319, 10549429}, + {378956748, 10533443}, + {389490191, 10530852}, + {400021043, 11036431}, + {411057474, 10541007}, + {421598481, 10600477}, + {432198958, 10519805}, + {442718763, 10500769}, + {453219532, 10507192}, + {463726724, 10540424}, + {474267148, 10509129}, + {484776277, 10527100}, + {495303377, 10720789}, + {506024166, 10568542}, + {516592708, 11046886}, + {527639594, 10580358}, + {538219952, 10508940}, + {548728892, 10523968}, + {559252860, 10580626}, + {569833486, 10539361}, + {580372847, 10496662}, + {590869509, 10505280}, + {601374789, 10564655}, + {611939444, 10505842}, + {622445286, 10523889}, + {632969175, 10553186}, + {643522361, 10535866}, + {654058227, 10501796}, + {664560023, 10530358}, + {675090381, 10585340}, + {685675721, 10602017}, + {696277738, 10546614}, + {706824352, 10511511}, + {717335863, 11019221}, + {728355084, 10558143}, + {738913227, 10516245}, + {749429472, 10502613}, + {759932085, 10522145}, + {770454230, 10489373}, + {780943603, 10520973}, + {791464576, 11021218}, + {802485794, 10496362}, + {812982156, 10502354}, + {823484510, 10515932}, + {834000442, 10591044}, + {844591486, 5523957} + }; + + List<IntermediateEntry> entries = new ArrayList<IntermediateEntry>(); + for (int i = 0; i < 2; i++) { + List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + for (int j = 0; j < pageDatas.length; j++) { + pages.add(new Pair(pageDatas[j][0], (int) (pageDatas[j][1]))); + } + IntermediateEntry entry = new IntermediateEntry(-1, -1, 1, new QueryUnit.PullHost("host" + i , 9000)); + entry.setPages(pages); + + entries.add(entry); + } + + long splitVolume = 256 * 1024 * 1024; + List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, entries, splitVolume, + 10 * 1024 * 1024); + + + long[][] expected = { + {0,263216172}, + {263216172,264423422}, + {527639594,263824982}, + {791464576,58650867}, + {0,200035717}, + {200035717,263691007}, + {463726724,264628360}, + {728355084,121760359}, + }; + int index = 0; + for (List<FetchImpl> eachFetchList: fetches) { + if (index == 3) { + assertEquals(2, eachFetchList.size()); + } else { + assertEquals(1, eachFetchList.size()); + } + for (FetchImpl eachFetch: eachFetchList) { + assertEquals(expected[index][0], eachFetch.getOffset()); + assertEquals(expected[index][1], eachFetch.getLength()); + index++; + } + } + } + private static void assertFetchImpl(FetchImpl [] expected, Map<String, List<FetchImpl>>[] result) { Set<FetchImpl> expectedURLs = Sets.newHashSet(); http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestIntermediateEntry.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestIntermediateEntry.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestIntermediateEntry.java new file mode 100644 index 0000000..114b232 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestIntermediateEntry.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.querymaster; + +import org.apache.tajo.util.Pair; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class TestIntermediateEntry { + @Test + public void testPage() { + QueryUnit.IntermediateEntry interm = new QueryUnit.IntermediateEntry(-1, -1, 1, null); + + List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + pages.add(new Pair(0L, 1441275)); + pages.add(new Pair(1441275L, 1447446)); + pages.add(new Pair(2888721L, 1442507)); + + interm.setPages(pages); + + long splitBytes = 3 * 1024 * 1024; + + List<Pair<Long, Long>> splits = interm.split(splitBytes, splitBytes); + assertEquals(2, splits.size()); + + long[][] expected = { {0, 1441275 + 1447446}, {1441275 + 1447446, 1442507} }; + for (int i = 0; i < 2; i++) { + Pair<Long, Long> eachSplit = splits.get(i); + assertEquals(expected[i][0], eachSplit.getFirst().longValue()); + assertEquals(expected[i][1], eachSplit.getSecond().longValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java index b4be00b..d64e4c7 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryUnitStatusUpdate.java @@ -57,7 +57,7 @@ public class TestQueryUnitStatusUpdate extends QueryTestCaseBase { // tpch/lineitem.tbl long[] expectedNumRows = new long[]{5, 2, 2, 2}; long[] expectedNumBytes = new long[]{604, 18, 18, 8}; - long[] expectedReadBytes = new long[]{604, 0, 18, 0}; + long[] expectedReadBytes = new long[]{604, 604, 18, 0}; assertStatus(2, expectedNumRows, expectedNumBytes, expectedReadBytes); } finally { @@ -75,7 +75,7 @@ public class TestQueryUnitStatusUpdate extends QueryTestCaseBase { // tpch/lineitem.tbl long[] expectedNumRows = new long[]{5, 2, 2, 2, 2, 2}; long[] expectedNumBytes = new long[]{604, 162, 162, 138, 138, 194}; - long[] expectedReadBytes = new long[]{604, 0, 162, 0, 138, 0}; + long[] expectedReadBytes = new long[]{604, 604, 162, 0, 138, 0}; assertStatus(3, expectedNumRows, expectedNumBytes, expectedReadBytes); } finally { @@ -106,7 +106,7 @@ public class TestQueryUnitStatusUpdate extends QueryTestCaseBase { // in/out * subquery(4) long[] expectedNumRows = new long[]{2, 2, 5, 5, 7, 2, 2, 2}; long[] expectedNumBytes = new long[]{8, 34, 20, 75, 109, 34, 34, 18}; - long[] expectedReadBytes = new long[]{8, 0, 20, 0, 109, 0, 34, 0}; + long[] expectedReadBytes = new long[]{8, 8, 20, 20, 109, 0, 34, 0}; assertStatus(4, expectedNumRows, expectedNumBytes, expectedReadBytes); } finally { http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java index c13842b..eb63c27 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestFetcher.java @@ -23,8 +23,10 @@ import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.rpc.RpcChannelFactory; +import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.util.CommonTestingUtil; import org.jboss.netty.channel.socket.ClientSocketChannelFactory; import org.junit.After; @@ -73,11 +75,13 @@ public class TestFetcher { Random rnd = new Random(); QueryId queryId = QueryIdFactory.NULL_QUERY_ID; String sid = "1"; - String ta = "1_0"; String partId = "1"; - String dataPath = INPUT_DIR + queryId.toString() + "/output"+ "/" + sid + "/" +ta + "/output/" + partId; - String params = String.format("qid=%s&sid=%s&p=%s&type=%s&ta=%s", queryId, sid, partId, "h", ta); + int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), conf); + String dataPath = conf.getVar(ConfVars.WORKER_TEMPORAL_DIR) + + queryId.toString() + "/output/" + sid + "/hash-shuffle/" + partParentId + "/" + partId; + + String params = String.format("qid=%s&sid=%s&p=%s&type=%s", queryId, sid, partId, "h"); Path inputPath = new Path(dataPath); FSDataOutputStream stream = LocalFileSystem.get(conf).create(inputPath, true); http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java index a2dda76..c6be73b 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/AbstractStorageManager.java @@ -172,7 +172,6 @@ public abstract class AbstractStorageManager { return appender; } - public TableMeta getTableMeta(Path tablePath) throws IOException { TableMeta meta; http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java new file mode 100644 index 0000000..934fd94 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.util.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class HashShuffleAppender implements Appender { + private static Log LOG = LogFactory.getLog(HashShuffleAppender.class); + + private FileAppender appender; + private AtomicBoolean closed = new AtomicBoolean(false); + private int partId; + + private TableStats tableStats; + + //<taskId,<page start offset,<task start, task end>>> + private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes; + + //page start offset, length + private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + + private Pair<Long, Integer> currentPage; + + private int pageSize; //MB + + private int rowNumInPage; + + private int totalRows; + + private long offset; + + private ExecutionBlockId ebId; + + public HashShuffleAppender(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender) { + this.ebId = ebId; + this.partId = partId; + this.appender = appender; + this.pageSize = pageSize; + } + + @Override + public void init() throws IOException { + currentPage = new Pair(0L, 0); + taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>(); + rowNumInPage = 0; + } + + /** + * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition. + * After writing if a current page exceeds pageSize, pageOffset will be added. + * @param taskId + * @param tuples + * @return written bytes + * @throws IOException + */ + public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples) throws IOException { + synchronized(appender) { + if (closed.get()) { + return 0; + } + long currentPos = appender.getOffset(); + + for (Tuple eachTuple: tuples) { + appender.addTuple(eachTuple); + } + long posAfterWritten = appender.getOffset(); + + int writtenBytes = (int)(posAfterWritten - currentPos); + + int nextRowNum = rowNumInPage + tuples.size(); + List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId); + if (taskIndexes == null) { + taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>(); + taskTupleIndexes.put(taskId, taskIndexes); + } + taskIndexes.add( + new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum))); + rowNumInPage = nextRowNum; + + if (posAfterWritten - currentPage.getFirst() > pageSize) { + nextPage(posAfterWritten); + rowNumInPage = 0; + } + + totalRows += tuples.size(); + return writtenBytes; + } + } + + public long getOffset() throws IOException { + if (closed.get()) { + return offset; + } else { + return appender.getOffset(); + } + } + + private void nextPage(long pos) { + currentPage.setSecond((int) (pos - currentPage.getFirst())); + pages.add(currentPage); + currentPage = new Pair(pos, 0); + } + + @Override + public void addTuple(Tuple t) throws IOException { + throw new IOException("Not support addTuple, use addTuples()"); + } + + @Override + public void flush() throws IOException { + synchronized(appender) { + if (closed.get()) { + return; + } + appender.flush(); + } + } + + @Override + public void close() throws IOException { + synchronized(appender) { + if (closed.get()) { + return; + } + appender.flush(); + offset = appender.getOffset(); + if (offset > currentPage.getFirst()) { + nextPage(offset); + } + appender.close(); + if (LOG.isDebugEnabled()) { + if (!pages.isEmpty()) { + LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size() + + ", lastPage=" + pages.get(pages.size() - 1)); + } else { + LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()); + } + } + closed.set(true); + tableStats = appender.getStats(); + } + } + + @Override + public void enableStats() { + } + + @Override + public TableStats getStats() { + synchronized(appender) { + return appender.getStats(); + } + } + + public List<Pair<Long, Integer>> getPages() { + return pages; + } + + public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() { + return taskTupleIndexes; + } + + public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() { + List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<Pair<Long, Pair<Integer, Integer>>>(); + + for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) { + merged.addAll(eachFailureIndex); + } + + return merged; + } + + public void taskFinished(QueryUnitAttemptId taskId) { + taskTupleIndexes.remove(taskId); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java new file mode 100644 index 0000000..f0699b7 --- /dev/null +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java @@ -0,0 +1,225 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalDirAllocator; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.util.Pair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +public class HashShuffleAppenderManager { + private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class); + + private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap = + new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>(); + private TajoConf systemConf; + private FileSystem defaultFS; + private FileSystem localFS; + private LocalDirAllocator lDirAllocator; + private int pageSize; + + public HashShuffleAppenderManager(TajoConf systemConf) throws IOException { + this.systemConf = systemConf; + + // initialize LocalDirAllocator + lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); + + // initialize DFS and LocalFileSystems + defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); + localFS = FileSystem.getLocal(systemConf); + pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; + } + + public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId, + TableMeta meta, Schema outSchema) throws IOException { + synchronized (appenderMap) { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId); + + if (partitionAppenderMap == null) { + partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); + appenderMap.put(ebId, partitionAppenderMap); + } + + PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId); + if (partitionAppenderMeta == null) { + Path dataFile = getDataFile(ebId, partId); + FileSystem fs = dataFile.getFileSystem(systemConf); + if (fs.exists(dataFile)) { + FileStatus status = fs.getFileStatus(dataFile); + LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); + } + + if (!fs.exists(dataFile.getParent())) { + fs.mkdirs(dataFile.getParent()); + } + FileAppender appender = (FileAppender) StorageManagerFactory.getStorageManager( + tajoConf).getAppender(meta, outSchema, dataFile); + appender.enableStats(); + appender.init(); + + partitionAppenderMeta = new PartitionAppenderMeta(); + partitionAppenderMeta.partId = partId; + partitionAppenderMeta.dataFile = dataFile; + partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender); + partitionAppenderMeta.appender.init(); + partitionAppenderMap.put(partId, partitionAppenderMeta); + + LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile); + } + + return partitionAppenderMeta.appender; + } + } + + public static int getPartParentId(int partId, TajoConf tajoConf) { + return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS); + } + + private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException { + try { + // the base dir for an output dir + String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; + Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf)); + //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")"); + + // If EB has many partition, too many shuffle file are in single directory. + return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new IOException(e); + } + } + + public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws IOException { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null; + synchronized (appenderMap) { + partitionAppenderMap = appenderMap.remove(ebId); + } + + if (partitionAppenderMap == null) { + LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle"); + return null; + } + + // Send Intermediate data to QueryMaster. + List<HashShuffleIntermediate> intermEntries = new ArrayList<HashShuffleIntermediate>(); + for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) { + try { + eachMeta.appender.close(); + HashShuffleIntermediate intermediate = + new HashShuffleIntermediate(eachMeta.partId, eachMeta.appender.getOffset(), + eachMeta.appender.getPages(), + eachMeta.appender.getMergedTupleIndexes()); + intermEntries.add(intermediate); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + throw e; + } + } + + LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + intermEntries.size()); + + return intermEntries; + } + + public void finalizeTask(QueryUnitAttemptId taskId) { + synchronized (appenderMap) { + Map<Integer, PartitionAppenderMeta> partitionAppenderMap = + appenderMap.get(taskId.getQueryUnitId().getExecutionBlockId()); + if (partitionAppenderMap == null) { + return; + } + + for (PartitionAppenderMeta eachAppender: partitionAppenderMap.values()) { + eachAppender.appender.taskFinished(taskId); + } + } + } + + public static class HashShuffleIntermediate { + private int partId; + + private long volume; + + //[<page start offset,<task start, task end>>] + private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes; + + //[<page start offset, length>] + private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); + + public HashShuffleIntermediate(int partId, long volume, + List<Pair<Long, Integer>> pages, + Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes) { + this.partId = partId; + this.volume = volume; + this.failureTskTupleIndexes = failureTskTupleIndexes; + this.pages = pages; + } + + public int getPartId() { + return partId; + } + + public long getVolume() { + return volume; + } + + public Collection<Pair<Long, Pair<Integer, Integer>>> getFailureTskTupleIndexes() { + return failureTskTupleIndexes; + } + + public List<Pair<Long, Integer>> getPages() { + return pages; + } + } + + static class PartitionAppenderMeta { + int partId; + HashShuffleAppender appender; + Path dataFile; + + public int getPartId() { + return partId; + } + + public HashShuffleAppender getAppender() { + return appender; + } + + public Path getDataFile() { + return dataFile; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java index 41d1e05..1f57675 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/RawFile.java @@ -729,6 +729,7 @@ public class RawFile { @Override public TableStats getStats() { if (enabledStats) { + stats.setNumBytes(pos); return stats.getTableStat(); } else { return null; http://git-wip-us.apache.org/repos/asf/tajo/blob/f3d63b46/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java ---------------------------------------------------------------------- diff --git a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java index 3b0ee1f..e68e351 100644 --- a/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java +++ b/tajo-yarn-pullserver/src/main/java/org/apache/tajo/pullserver/TajoPullServerService.java @@ -47,6 +47,7 @@ import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.pullserver.listener.FileCloseListener; import org.apache.tajo.pullserver.retriever.FileChunk; import org.apache.tajo.rpc.RpcChannelFactory; +import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.storage.RowStoreUtil; import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; import org.apache.tajo.storage.Tuple; @@ -207,10 +208,12 @@ public class TajoPullServerService extends AbstractService { selector = RpcChannelFactory.createServerChannelFactory("PullServerAuxService", workerNum); localFS = new LocalFileSystem(); - super.init(new Configuration(conf)); + super.init(conf); this.getConfig().setInt(TajoConf.ConfVars.PULLSERVER_PORT.varname , TajoConf.ConfVars.PULLSERVER_PORT.defaultIntVal); + + LOG.info("Tajo PullServer initialized: readaheadLength=" + readaheadLength); } catch (Throwable t) { LOG.error(t); } @@ -228,9 +231,11 @@ public class TajoPullServerService extends AbstractService { throw new RuntimeException(ex); } bootstrap.setPipelineFactory(pipelineFact); + port = conf.getInt(ConfVars.PULLSERVER_PORT.varname, ConfVars.PULLSERVER_PORT.defaultIntVal); Channel ch = bootstrap.bind(new InetSocketAddress(port)); + accepted.add(ch); port = ((InetSocketAddress)ch.getLocalAddress()).getPort(); conf.set(ConfVars.PULLSERVER_PORT.varname, Integer.toString(port)); @@ -373,10 +378,11 @@ public class TajoPullServerService extends AbstractService { final List<String> taskIdList = params.get("ta"); final List<String> subQueryIds = params.get("sid"); final List<String> partIds = params.get("p"); + final List<String> offsetList = params.get("offset"); + final List<String> lengthList = params.get("length"); - if (types == null || taskIdList == null || subQueryIds == null || qids == null - || partIds == null) { - sendError(ctx, "Required queryId, type, taskIds, subquery Id, and part id", + if (types == null || subQueryIds == null || qids == null || partIds == null) { + sendError(ctx, "Required queryId, type, subquery Id, and part id", BAD_REQUEST); return; } @@ -387,12 +393,18 @@ public class TajoPullServerService extends AbstractService { return; } - final List<FileChunk> chunks = Lists.newArrayList(); - + String partId = partIds.get(0); String queryId = qids.get(0); String shuffleType = types.get(0); String sid = subQueryIds.get(0); - String partId = partIds.get(0); + + long offset = (offsetList != null && !offsetList.isEmpty()) ? Long.parseLong(offsetList.get(0)) : -1L; + long length = (lengthList != null && !lengthList.isEmpty()) ? Long.parseLong(lengthList.get(0)) : -1L; + + if (!shuffleType.equals("h") && !shuffleType.equals("s") && taskIdList == null) { + sendError(ctx, "Required taskIds", BAD_REQUEST); + } + List<String> taskIds = splitMaps(taskIdList); LOG.info("PullServer request param: shuffleType=" + shuffleType + @@ -403,6 +415,8 @@ public class TajoPullServerService extends AbstractService { LOG.info("PullServer baseDir: " + conf.get(ConfVars.WORKER_TEMPORAL_DIR.varname) + "/" + queryBaseDir); + final List<FileChunk> chunks = Lists.newArrayList(); + // if a subquery requires a range shuffle if (shuffleType.equals("r")) { String ta = taskIds.get(0); @@ -431,18 +445,29 @@ public class TajoPullServerService extends AbstractService { // if a subquery requires a hash shuffle or a scattered hash shuffle } else if (shuffleType.equals("h") || shuffleType.equals("s")) { - for (String ta : taskIds) { - if (!lDirAlloc.ifExists(queryBaseDir + "/" + sid + "/" + ta + "/output/" + partId, conf)) { - LOG.warn(e); - sendError(ctx, NO_CONTENT); - return; - } - Path path = localFS.makeQualified( - lDirAlloc.getLocalPathToRead(queryBaseDir + "/" + sid + "/" + ta + "/output/" + partId, conf)); - File file = new File(path.toUri()); - FileChunk chunk = new FileChunk(file, 0, file.length()); - chunks.add(chunk); + int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); + String partPath = queryBaseDir + "/" + sid + "/hash-shuffle/" + partParentId + "/" + partId; + if (!lDirAlloc.ifExists(partPath, conf)) { + LOG.warn("Partition shuffle file not exists: " + partPath); + sendError(ctx, NO_CONTENT); + return; + } + + Path path = localFS.makeQualified(lDirAlloc.getLocalPathToRead(partPath, conf)); + + File file = new File(path.toUri()); + long startPos = (offset >= 0 && length >= 0) ? offset : 0; + long readLen = (offset >= 0 && length >= 0) ? length : file.length(); + + if (startPos >= file.length()) { + String errorMessage = "Start pos[" + startPos + "] great than file length [" + file.length() + "]"; + LOG.error(errorMessage); + sendError(ctx, errorMessage, BAD_REQUEST); + return; } + LOG.info("RequestURL: " + request.getUri() + ", fileLen=" + file.length()); + FileChunk chunk = new FileChunk(file, startPos, readLen); + chunks.add(chunk); } else { LOG.error("Unknown shuffle type: " + shuffleType); sendError(ctx, "Unknown shuffle type:" + shuffleType, BAD_REQUEST);
