TAJO-1950: Query master uses too much memory during range shuffle. Closes #884
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/1f9ae1da Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/1f9ae1da Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/1f9ae1da Branch: refs/heads/master Commit: 1f9ae1da0424731567cea18e975c47d4479b0ae9 Parents: e8ee7f2 Author: Jihoon Son <[email protected]> Authored: Thu Dec 24 18:17:56 2015 +0900 Committer: Jihoon Son <[email protected]> Committed: Thu Dec 24 18:17:56 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../java/org/apache/tajo/conf/TajoConf.java | 4 + .../apache/tajo/ha/TestHAServiceHDFSImpl.java | 4 +- .../apache/tajo/master/TestRepartitioner.java | 81 ++--- .../org/apache/tajo/worker/TestFetcher.java | 2 +- .../apache/tajo/engine/query/TaskRequest.java | 6 +- .../tajo/engine/query/TaskRequestImpl.java | 13 +- .../org/apache/tajo/engine/utils/TupleUtil.java | 32 -- .../tajo/querymaster/DefaultTaskScheduler.java | 13 +- .../tajo/querymaster/FetchScheduleEvent.java | 7 +- .../apache/tajo/querymaster/Repartitioner.java | 206 ++++++++----- .../java/org/apache/tajo/querymaster/Stage.java | 3 +- .../java/org/apache/tajo/querymaster/Task.java | 48 ++- .../tajo/worker/ExecutionBlockContext.java | 68 ++++- .../java/org/apache/tajo/worker/FetchImpl.java | 140 ++++----- .../java/org/apache/tajo/worker/Fetcher.java | 45 ++- .../java/org/apache/tajo/worker/TaskImpl.java | 174 ++++++----- tajo-core/src/main/proto/ResourceProtos.proto | 14 +- .../src/main/resources/webapps/worker/task.jsp | 17 +- .../tajo/pullserver/HttpDataServerHandler.java | 6 +- .../tajo/pullserver/TajoPullServerService.java | 302 ++++++++++++++----- .../apache/tajo/storage/index/bst/BSTIndex.java | 54 +++- 22 files changed, 811 insertions(+), 430 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index 2edf9d7..c02bfc7 100644 --- a/CHANGES +++ b/CHANGES @@ -8,6 +8,8 @@ Release 0.12.0 - unreleased IMPROVEMENT + TAJO-1950: Query master uses too much memory during range shuffle. (jihoon) + TAJO-1858: Aligning error message in execute query page of web UI is needed. (Byunghwa Yun via jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 9ab3dfa..29cf9ee 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -212,6 +212,10 @@ public class TajoConf extends Configuration { // Shuffle Configuration -------------------------------------------------- PULLSERVER_PORT("tajo.pullserver.port", 0, Validators.range("0", "65535")), + PULLSERVER_CACHE_SIZE("tajo.pullserver.index-cache.size", 10000, Validators.min("1")), + PULLSERVER_CACHE_TIMEOUT("tajo.pullserver.index-cache.timeout-min", 5, Validators.min("1")), + PULLSERVER_FETCH_URL_MAX_LENGTH("tajo.pullserver.fetch-url.max-length", StorageUnit.KB, + Validators.min("1")), SHUFFLE_SSL_ENABLED_KEY("tajo.pullserver.ssl.enabled", false, Validators.bool()), SHUFFLE_FILE_FORMAT("tajo.shuffle.file-format", BuiltinStorages.RAW, Validators.javaString()), SHUFFLE_FETCHER_PARALLEL_EXECUTION_MAX_NUM("tajo.shuffle.fetcher.parallel-execution.max-num", http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java b/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java index 279fce7..81eeb1f 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/ha/TestHAServiceHDFSImpl.java @@ -88,7 +88,9 @@ public class TestHAServiceHDFSImpl { assertEquals(2, fs.listStatus(activePath).length); assertEquals(0, fs.listStatus(backupPath).length); } finally { - backupMaster.stop(); + if (backupMaster != null) { + backupMaster.stop(); + } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java index b0a3a17..c260ab6 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestRepartitioner.java @@ -29,13 +29,15 @@ import org.apache.tajo.TestTajoIds; import org.apache.tajo.querymaster.Repartitioner; import org.apache.tajo.querymaster.Task; import org.apache.tajo.querymaster.Task.IntermediateEntry; +import org.apache.tajo.querymaster.Task.PullHost; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.Pair; -import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.FetchImpl; import org.junit.Test; import java.net.URI; import java.util.*; +import java.util.stream.Collectors; import static junit.framework.Assert.assertEquals; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; @@ -70,11 +72,9 @@ public class TestRepartitioner { new HashMap<>(); for (Map.Entry<Integer, List<IntermediateEntry>> eachEntry: intermediateEntries.entrySet()) { - FetchImpl fetch = new FetchImpl(new Task.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE, + FetchImpl fetch = new FetchImpl(sid.toString(), new Task.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE, sid, eachEntry.getKey(), eachEntry.getValue()); - fetch.setName(sid.toString()); - FetchProto proto = fetch.getProto(); fetch = new FetchImpl(proto); assertEquals(proto, fetch.getProto()); @@ -84,7 +84,7 @@ public class TestRepartitioner { hashEntries.put(eachEntry.getKey(), ebEntries); - List<URI> uris = fetch.getURIs(); + List<URI> uris = Repartitioner.createFullURIs(2 * StorageUnit.KB, fetch.getProto()); assertEquals(1, uris.size()); //In Hash Suffle, Fetcher return only one URI per partition. URI uri = uris.get(0); @@ -119,7 +119,7 @@ public class TestRepartitioner { ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); FetchImpl [] fetches = new FetchImpl[12]; for (int i = 0; i < 12; i++) { - fetches[i] = new FetchImpl(new Task.PullHost("localhost", 10000 + i), HASH_SHUFFLE, ebId, i / 2); + fetches[i] = new FetchImpl(tableName, new Task.PullHost("localhost", 10000 + i), HASH_SHUFFLE, ebId, i / 2); } int [] VOLUMES = {100, 80, 70, 30, 10, 5}; @@ -128,37 +128,40 @@ public class TestRepartitioner { fetchGroups.put(i, new FetchGroupMeta(VOLUMES[i / 2], fetches[i]).addFetche(fetches[i + 1])); } - Pair<Long [], Map<String, List<FetchImpl>>[]> results; + FetchProto[] expectedProtos = new FetchProto[fetches.length]; + expectedProtos = Arrays.stream(fetches).map(fetch -> fetch.getProto()).collect(Collectors.toList()) + .toArray(expectedProtos); + Pair<Long [], Map<String, List<FetchProto>>[]> results; results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 1); long expected [] = {100 + 80 + 70 + 30 + 10 + 5}; assertFetchVolumes(expected, results.getFirst()); - assertFetchImpl(fetches, results.getSecond()); + assertFetchProto(expectedProtos, results.getSecond()); results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 2); long expected0 [] = {140, 155}; assertFetchVolumes(expected0, results.getFirst()); - assertFetchImpl(fetches, results.getSecond()); + assertFetchProto(expectedProtos, results.getSecond()); results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 3); long expected1 [] = {100, 95, 100}; assertFetchVolumes(expected1, results.getFirst()); - assertFetchImpl(fetches, results.getSecond()); + assertFetchProto(expectedProtos, results.getSecond()); results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 4); long expected2 [] = {100, 80, 70, 45}; assertFetchVolumes(expected2, results.getFirst()); - assertFetchImpl(fetches, results.getSecond()); + assertFetchProto(expectedProtos, results.getSecond()); results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 5); long expected3 [] = {100, 80, 70, 30, 15}; assertFetchVolumes(expected3, results.getFirst()); - assertFetchImpl(fetches, results.getSecond()); + assertFetchProto(expectedProtos, results.getSecond()); results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 6); long expected4 [] = {100, 80, 70, 30, 10, 5}; assertFetchVolumes(expected4, results.getFirst()); - assertFetchImpl(fetches, results.getSecond()); + assertFetchProto(expectedProtos, results.getSecond()); } private static void assertFetchVolumes(long [] expected, Long [] results) { @@ -191,7 +194,8 @@ public class TestRepartitioner { } long splitVolume = 128 * 1024 * 1024; - List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries, + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, intermediateEntries, splitVolume, 10 * 1024 * 1024); assertEquals(6, fetches.size()); @@ -199,10 +203,10 @@ public class TestRepartitioner { int index = 0; int numZeroPosFetcher = 0; long totalLength = 0; - for (List<FetchImpl> eachFetchList: fetches) { + for (List<FetchProto> eachFetchList: fetches) { totalInterms += eachFetchList.size(); long eachFetchVolume = 0; - for (FetchImpl eachFetch: eachFetchList) { + for (FetchProto eachFetch: eachFetchList) { eachFetchVolume += eachFetch.getLength(); if (eachFetch.getOffset() == 0) { numZeroPosFetcher++; @@ -248,8 +252,9 @@ public class TestRepartitioner { intermediateEntries.add(interm); } + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); long splitVolume = 128 * 1024 * 1024; - List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries, + List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, intermediateEntries, splitVolume, 10 * 1024 * 1024); assertEquals(32, fetches.size()); @@ -258,15 +263,15 @@ public class TestRepartitioner { long totalLength = 0; Set<String> uniqPullHost = new HashSet<>(); - for (List<FetchImpl> eachFetchList: fetches) { + for (List<FetchProto> eachFetchList: fetches) { long length = 0; - for (FetchImpl eachFetch: eachFetchList) { + for (FetchProto eachFetch: eachFetchList) { if (eachFetch.getOffset() == 0) { numZeroPosFetcher++; } totalLength += eachFetch.getLength(); length += eachFetch.getLength(); - uniqPullHost.add(eachFetch.getPullHost().toString()); + uniqPullHost.add(new PullHost(eachFetch.getHost(), eachFetch.getPort()).toString()); } assertTrue(length + " should be smaller than splitVolume", length < splitVolume); if (index < fetches.size() - 1) { @@ -378,7 +383,8 @@ public class TestRepartitioner { } long splitVolume = 256 * 1024 * 1024; - List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, entries, splitVolume, + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, entries, splitVolume, 10 * 1024 * 1024); @@ -393,13 +399,13 @@ public class TestRepartitioner { {728355084,121760359}, }; int index = 0; - for (List<FetchImpl> eachFetchList: fetches) { + for (List<FetchProto> eachFetchList: fetches) { if (index == 3) { assertEquals(2, eachFetchList.size()); } else { assertEquals(1, eachFetchList.size()); } - for (FetchImpl eachFetch: eachFetchList) { + for (FetchProto eachFetch: eachFetchList) { assertEquals(expected[index][0], eachFetch.getOffset()); assertEquals(expected[index][1], eachFetch.getLength()); index++; @@ -438,13 +444,14 @@ public class TestRepartitioner { } long splitVolume = 128 * 1024 * 1024; - List<List<FetchImpl>> fetches = Repartitioner.splitOrMergeIntermediates(null, intermediateEntries, + ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); + List<List<FetchProto>> fetches = Repartitioner.splitOrMergeIntermediates("name", ebId, intermediateEntries, splitVolume, 10 * 1024 * 1024); assertEquals(32, fetches.size()); int expectedSize = 0; - Set<FetchImpl> fetchSet = new HashSet<>(); - for(List<FetchImpl> list : fetches){ + Set<FetchProto> fetchSet = new HashSet<>(); + for(List<FetchProto> list : fetches){ expectedSize += list.size(); fetchSet.addAll(list); } @@ -456,15 +463,15 @@ public class TestRepartitioner { long totalLength = 0; Set<String> uniqPullHost = new HashSet<>(); - for (List<FetchImpl> eachFetchList: fetches) { + for (List<FetchProto> eachFetchList: fetches) { long length = 0; - for (FetchImpl eachFetch: eachFetchList) { + for (FetchProto eachFetch: eachFetchList) { if (eachFetch.getOffset() == 0) { numZeroPosFetcher++; } totalLength += eachFetch.getLength(); length += eachFetch.getLength(); - uniqPullHost.add(eachFetch.getPullHost().toString()); + uniqPullHost.add(new PullHost(eachFetch.getHost(), eachFetch.getPort()).toString()); } assertTrue(length + " should be smaller than splitVolume", length < splitVolume); if (index < fetches.size() - 1) { @@ -482,25 +489,25 @@ public class TestRepartitioner { ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0); Task.PullHost pullHost = new Task.PullHost("localhost", 0); - FetchImpl expected = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1); - FetchImpl fetch2 = new FetchImpl(pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1); + FetchImpl expected = new FetchImpl("name", pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1); + FetchImpl fetch2 = new FetchImpl("name", pullHost, SCATTERED_HASH_SHUFFLE, ebId, 1); assertEquals(expected, fetch2); fetch2.setOffset(5); fetch2.setLength(10); assertNotEquals(expected, fetch2); } - private static void assertFetchImpl(FetchImpl [] expected, Map<String, List<FetchImpl>>[] result) { - Set<FetchImpl> expectedURLs = Sets.newHashSet(); + private static void assertFetchProto(FetchProto [] expected, Map<String, List<FetchProto>>[] result) { + Set<FetchProto> expectedURLs = Sets.newHashSet(); - for (FetchImpl f : expected) { + for (FetchProto f : expected) { expectedURLs.add(f); } - Set<FetchImpl> resultURLs = Sets.newHashSet(); + Set<FetchProto> resultURLs = Sets.newHashSet(); - for (Map<String, List<FetchImpl>> e : result) { - for (List<FetchImpl> list : e.values()) { + for (Map<String, List<FetchProto>> e : result) { + for (List<FetchProto> list : e.values()) { resultURLs.addAll(list); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java index a91fc30..dfc37b0 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/TestFetcher.java @@ -90,7 +90,7 @@ public class TestFetcher { FileChunk storeChunk = new FileChunk(new File(OUTPUT_DIR + "data"), 0, 0); storeChunk.setFromRemote(true); final Fetcher fetcher = new Fetcher(conf, uri, storeChunk); - FileChunk chunk = fetcher.get(); + FileChunk chunk = fetcher.get().get(0); assertNotNull(chunk); assertNotNull(chunk.getFile()); http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java index 48d4780..ef4ff60 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequest.java @@ -21,6 +21,7 @@ */ package org.apache.tajo.engine.query; +import org.apache.tajo.ResourceProtos.FetchProto; import org.apache.tajo.ResourceProtos.TaskRequestProto; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.proto.CatalogProtos; @@ -29,7 +30,6 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.enforce.Enforcer; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.plan.serder.PlanProto; -import org.apache.tajo.worker.FetchImpl; import java.util.List; @@ -39,8 +39,8 @@ public interface TaskRequest extends ProtoObject<TaskRequestProto> { List<CatalogProtos.FragmentProto> getFragments(); PlanProto.LogicalNodeTree getPlan(); void setInterQuery(); - void addFetch(String name, FetchImpl fetch); - List<FetchImpl> getFetches(); + void addFetch(FetchProto fetch); + List<FetchProto> getFetches(); QueryContext getQueryContext(TajoConf conf); DataChannel getDataChannel(); Enforcer getEnforcer(); http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java index 7b52dab..fc7556c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/TaskRequestImpl.java @@ -42,7 +42,7 @@ public class TaskRequestImpl implements TaskRequest { private boolean clusteredOutput; private PlanProto.LogicalNodeTree plan; // logical node private Boolean interQuery; - private List<FetchImpl> fetches; + private List<FetchProto> fetches; private QueryContext queryContext; private DataChannel dataChannel; private Enforcer enforcer; @@ -157,10 +157,10 @@ public class TaskRequestImpl implements TaskRequest { this.interQuery = true; } - public void addFetch(String name, FetchImpl fetch) { + @Override + public void addFetch(FetchProto fetch) { maybeInitBuilder(); initFetches(); - fetch.setName(name); fetches.add(fetch); } @@ -212,7 +212,8 @@ public class TaskRequestImpl implements TaskRequest { return this.enforcer; } - public List<FetchImpl> getFetches() { + @Override + public List<FetchProto> getFetches() { initFetches(); return this.fetches; @@ -225,7 +226,7 @@ public class TaskRequestImpl implements TaskRequest { TaskRequestProtoOrBuilder p = viaProto ? proto : builder; this.fetches = new ArrayList<>(); for(FetchProto fetch : p.getFetchesList()) { - fetches.add(new FetchImpl(fetch)); + fetches.add(fetch); } } @@ -259,7 +260,7 @@ public class TaskRequestImpl implements TaskRequest { } if (this.fetches != null) { for (int i = 0; i < fetches.size(); i++) { - builder.addFetches(fetches.get(i).getProto()); + builder.addFetches(fetches.get(i)); } } if (this.queryMasterHostAndPort != null) { http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java index bae04c8..31c23f4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java @@ -20,7 +20,6 @@ package org.apache.tajo.engine.utils; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; -import org.apache.commons.codec.binary.Base64; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.Column; @@ -29,47 +28,16 @@ import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.statistics.ColumnStats; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.storage.RowStoreUtil; -import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.TupleRange; import org.apache.tajo.storage.VTuple; -import java.io.UnsupportedEncodingException; -import java.net.URLEncoder; import java.util.List; import java.util.Map; public class TupleUtil { private static final Log LOG = LogFactory.getLog(TupleUtil.class); - public static String rangeToQuery(Schema schema, TupleRange range, boolean last) - throws UnsupportedEncodingException { - return rangeToQuery(range, last, RowStoreUtil.createEncoder(schema)); - } - - public static String rangeToQuery(TupleRange range, boolean last, RowStoreEncoder encoder) - throws UnsupportedEncodingException { - StringBuilder sb = new StringBuilder(); - byte [] firstKeyBytes = encoder.toBytes(range.getStart()); - byte [] endKeyBytes = encoder.toBytes(range.getEnd()); - - String firstKeyBase64 = new String(Base64.encodeBase64(firstKeyBytes)); - String lastKeyBase64 = new String(Base64.encodeBase64(endKeyBytes)); - - sb.append("start=") - .append(URLEncoder.encode(firstKeyBase64, "utf-8")) - .append("&") - .append("end=") - .append(URLEncoder.encode(lastKeyBase64, "utf-8")); - - if (last) { - sb.append("&final=true"); - } - - return sb.toString(); - } - /** * if max value is null, set ranges[last] * @param sortSpecs http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index f1c0f62..26dc103 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -52,7 +52,6 @@ import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.RpcParameterFactory; import org.apache.tajo.util.TUtil; -import org.apache.tajo.worker.FetchImpl; import java.net.InetSocketAddress; import java.util.*; @@ -233,11 +232,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { } } else if (event instanceof FetchScheduleEvent) { FetchScheduleEvent castEvent = (FetchScheduleEvent) event; - Map<String, List<FetchImpl>> fetches = castEvent.getFetches(); + Map<String, List<FetchProto>> fetches = castEvent.getFetches(); TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(); Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++); scheduledObjectNum++; - for (Entry<String, List<FetchImpl>> eachFetch : fetches.entrySet()) { + for (Entry<String, List<FetchProto>> eachFetch : fetches.entrySet()) { task.addFetches(eachFetch.getKey(), eachFetch.getValue()); task.addFragment(fragmentsForNonLeafTask[0], true); if (fragmentsForNonLeafTask[1] != null) { @@ -983,11 +982,11 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { taskAssign.setInterQuery(); } - for(Map.Entry<String, Set<FetchImpl>> entry: task.getFetchMap().entrySet()) { - Collection<FetchImpl> fetches = entry.getValue(); + for(Map.Entry<String, Set<FetchProto>> entry: task.getFetchMap().entrySet()) { + Collection<FetchProto> fetches = entry.getValue(); if (fetches != null) { - for (FetchImpl fetch : fetches) { - taskAssign.addFetch(entry.getKey(), fetch); + for (FetchProto fetch : fetches) { + taskAssign.addFetch(fetch); } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java index 5fe2f80..e4e63b4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/FetchScheduleEvent.java @@ -19,6 +19,7 @@ package org.apache.tajo.querymaster; import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.ResourceProtos.FetchProto; import org.apache.tajo.master.event.TaskSchedulerEvent; import org.apache.tajo.worker.FetchImpl; @@ -26,15 +27,15 @@ import java.util.List; import java.util.Map; public class FetchScheduleEvent extends TaskSchedulerEvent { - private final Map<String, List<FetchImpl>> fetches; + private final Map<String, List<FetchProto>> fetches; // map of table name and fetch list public FetchScheduleEvent(final EventType eventType, final ExecutionBlockId blockId, - final Map<String, List<FetchImpl>> fetches) { + final Map<String, List<FetchProto>> fetches) { super(eventType, blockId); this.fetches = fetches; } - public Map<String, List<FetchImpl>> getFetches() { + public Map<String, List<FetchProto>> getFetches() { return fetches; } } http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java index e64cd51..bd8311f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Repartitioner.java @@ -25,8 +25,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.ResourceProtos.FetchProto; import org.apache.tajo.SessionVars; import org.apache.tajo.algebra.JoinType; +import org.apache.tajo.annotation.NotNull; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; @@ -48,7 +50,9 @@ import org.apache.tajo.plan.logical.SortNode.SortPurpose; import org.apache.tajo.plan.serder.PlanProto.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.plan.serder.PlanProto.EnforceProperty; import org.apache.tajo.plan.util.PlannerUtil; +import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.querymaster.Task.IntermediateEntry; +import org.apache.tajo.querymaster.Task.PullHost; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.Fragment; @@ -57,13 +61,17 @@ import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.worker.FetchImpl; +import org.apache.tajo.worker.FetchImpl.RangeParam; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.math.BigInteger; import java.net.URI; +import java.net.URLEncoder; import java.util.*; import java.util.Map.Entry; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*; @@ -75,7 +83,6 @@ import static org.apache.tajo.plan.serder.PlanProto.ShuffleType.*; public class Repartitioner { private static final Log LOG = LogFactory.getLog(Repartitioner.class); - private final static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900; private final static String UNKNOWN_HOST = "unknown"; public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, Stage stage) @@ -546,12 +553,13 @@ public class Repartitioner { private static void addJoinShuffle(Stage stage, int partitionId, Map<ExecutionBlockId, List<IntermediateEntry>> grouppedPartitions) { - Map<String, List<FetchImpl>> fetches = new HashMap<>(); + Map<String, List<FetchProto>> fetches = new HashMap<>(); for (ExecutionBlock execBlock : stage.getMasterPlan().getChilds(stage.getId())) { if (grouppedPartitions.containsKey(execBlock.getId())) { - Collection<FetchImpl> requests = mergeShuffleRequest(partitionId, HASH_SHUFFLE, + String name = execBlock.getId().toString(); + List<FetchProto> requests = mergeShuffleRequest(name, partitionId, HASH_SHUFFLE, grouppedPartitions.get(execBlock.getId())); - fetches.put(execBlock.getId().toString(), Lists.newArrayList(requests)); + fetches.put(name, requests); } } @@ -568,9 +576,10 @@ public class Repartitioner { * * @return key: pullserver's address, value: a list of requests */ - private static Collection<FetchImpl> mergeShuffleRequest(int partitionId, - ShuffleType type, - List<IntermediateEntry> partitions) { + private static List<FetchProto> mergeShuffleRequest(final String fetchName, + final int partitionId, + final ShuffleType type, + final List<IntermediateEntry> partitions) { // ebId + pullhost -> FetchImmpl Map<String, FetchImpl> mergedPartitions = new HashMap<>(); @@ -582,12 +591,15 @@ public class Repartitioner { fetch.addPart(partition.getTaskId(), partition.getAttemptId()); } else { // In some cases like union each IntermediateEntry has different EBID. - FetchImpl fetch = new FetchImpl(partition.getPullHost(), type, partition.getEbId(), partitionId); + FetchImpl fetch = new FetchImpl(fetchName, partition.getPullHost(), type, partition.getEbId(), partitionId); fetch.addPart(partition.getTaskId(), partition.getAttemptId()); mergedPartitions.put(mergedKey, fetch); } } - return mergedPartitions.values(); + + return mergedPartitions.values().stream() + .map(fetch -> fetch.getProto()) + .collect(Collectors.toList()); } public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext schedulerContext, @@ -699,44 +711,44 @@ public class Repartitioner { new String[]{UNKNOWN_HOST}); Stage.scheduleFragment(stage, dummyFragment); - List<FetchImpl> fetches = new ArrayList<>(); + Map<Pair<PullHost, ExecutionBlockId>, FetchImpl> fetches = new HashMap<>(); List<ExecutionBlock> childBlocks = masterPlan.getChilds(stage.getId()); for (ExecutionBlock childBlock : childBlocks) { Stage childExecSM = stage.getContext().getStage(childBlock.getId()); for (Task qu : childExecSM.getTasks()) { for (IntermediateEntry p : qu.getIntermediateData()) { - FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0); - fetch.addPart(p.getTaskId(), p.getAttemptId()); - fetches.add(fetch); + Pair<PullHost, ExecutionBlockId> key = new Pair<>(p.getPullHost(), childBlock.getId()); + if (fetches.containsKey(key)) { + fetches.get(key).addPart(p.getTaskId(), p.getAttemptId()); + } else { + FetchImpl fetch = new FetchImpl(scan.getTableName(), p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0); + fetch.addPart(p.getTaskId(), p.getAttemptId()); + fetches.put(key, fetch); + } } } } - SortedMap<TupleRange, Collection<FetchImpl>> map; + SortedMap<TupleRange, Collection<FetchProto>> map; map = new TreeMap<>(); - Set<FetchImpl> fetchSet; - try { - RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(sortSchema); - for (int i = 0; i < ranges.length; i++) { - fetchSet = new HashSet<>(); - for (FetchImpl fetch: fetches) { - String rangeParam = - TupleUtil.rangeToQuery(ranges[i], i == (ranges.length - 1) , encoder); - FetchImpl copy = null; - try { - copy = fetch.clone(); - } catch (CloneNotSupportedException e) { - throw new RuntimeException(e); - } - copy.setRangeParams(rangeParam); - fetchSet.add(copy); + Set<FetchProto> fetchSet; + RowStoreUtil.RowStoreEncoder encoder = RowStoreUtil.createEncoder(sortSchema); + for (int i = 0; i < ranges.length; i++) { + fetchSet = new HashSet<>(); + RangeParam rangeParam = new RangeParam(ranges[i], i == (ranges.length - 1), encoder); + for (FetchImpl fetch : fetches.values()) { + FetchImpl copy = null; + try { + copy = fetch.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); } - map.put(ranges[i], fetchSet); + copy.setRangeParams(rangeParam); + fetchSet.add(copy.getProto()); } - } catch (UnsupportedEncodingException e) { - LOG.error(e); + map.put(ranges[i], fetchSet); } scheduleFetchesByRoundRobin(stage, map, scan.getTableName(), determinedTaskNum); @@ -744,20 +756,20 @@ public class Repartitioner { schedulerContext.setEstimatedTaskNum(determinedTaskNum); } - public static void scheduleFetchesByRoundRobin(Stage stage, Map<?, Collection<FetchImpl>> partitions, - String tableName, int num) { + public static void scheduleFetchesByRoundRobin(Stage stage, Map<?, Collection<FetchProto>> partitions, + String tableName, int num) { int i; - Map<String, List<FetchImpl>>[] fetchesArray = new Map[num]; + Map<String, List<FetchProto>>[] fetchesArray = new Map[num]; for (i = 0; i < num; i++) { fetchesArray[i] = new HashMap<>(); } i = 0; - for (Entry<?, Collection<FetchImpl>> entry : partitions.entrySet()) { - Collection<FetchImpl> value = entry.getValue(); + for (Entry<?, Collection<FetchProto>> entry : partitions.entrySet()) { + Collection<FetchProto> value = entry.getValue(); TUtil.putCollectionToNestedList(fetchesArray[i++], tableName, value); if (i == num) i = 0; } - for (Map<String, List<FetchImpl>> eachFetches : fetchesArray) { + for (Map<String, List<FetchProto>> eachFetches : fetchesArray) { Stage.scheduleFetches(stage, eachFetches); } } @@ -785,6 +797,10 @@ public class Repartitioner { return totalVolume; } + public List<FetchProto> getFetchProtos() { + return fetchUrls.stream().map(fetch -> fetch.getProto()).collect(Collectors.toList()); + } + } public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan, @@ -821,7 +837,7 @@ public class Repartitioner { Map<Task.PullHost, List<IntermediateEntry>> hashedByHost = hashByHost(interm.getValue()); for (Entry<Task.PullHost, List<IntermediateEntry>> e : hashedByHost.entrySet()) { - FetchImpl fetch = new FetchImpl(e.getKey(), channel.getShuffleType(), + FetchImpl fetch = new FetchImpl(scan.getTableName(), e.getKey(), channel.getShuffleType(), block.getId(), interm.getKey(), e.getValue()); long volumeSum = 0; @@ -891,20 +907,16 @@ public class Repartitioner { } } - public static Pair<Long [], Map<String, List<FetchImpl>>[]> makeEvenDistributedFetchImpl( + public static Pair<Long [], Map<String, List<FetchProto>>[]> makeEvenDistributedFetchImpl( Map<Integer, FetchGroupMeta> partitions, String tableName, int num) { // Sort fetchGroupMeta in a descending order of data volumes. List<FetchGroupMeta> fetchGroupMetaList = Lists.newArrayList(partitions.values()); - Collections.sort(fetchGroupMetaList, new Comparator<FetchGroupMeta>() { - @Override - public int compare(FetchGroupMeta o1, FetchGroupMeta o2) { - return o1.getVolume() < o2.getVolume() ? 1 : (o1.getVolume() > o2.getVolume() ? -1 : 0); - } - }); + Collections.sort(fetchGroupMetaList, (o1, o2) -> + o1.getVolume() < o2.getVolume() ? 1 : (o1.getVolume() > o2.getVolume() ? -1 : 0)); // Initialize containers - Map<String, List<FetchImpl>>[] fetchesArray = new Map[num]; + Map<String, List<FetchProto>>[] fetchesArray = new Map[num]; Long [] assignedVolumes = new Long[num]; // initialization for (int i = 0; i < num; i++) { @@ -925,7 +937,7 @@ public class Repartitioner { FetchGroupMeta fetchGroupMeta = iterator.next(); assignedVolumes[p] += fetchGroupMeta.getVolume(); - TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls); + TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.getFetchProtos()); p++; } @@ -933,13 +945,13 @@ public class Repartitioner { while (p >= 0 && iterator.hasNext()) { FetchGroupMeta fetchGroupMeta = iterator.next(); assignedVolumes[p] += fetchGroupMeta.getVolume(); - TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls); + TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.getFetchProtos()); // While the current one is smaller than next one, it adds additional fetches to current one. while(iterator.hasNext() && (p > 0 && assignedVolumes[p - 1] > assignedVolumes[p])) { FetchGroupMeta additionalFetchGroup = iterator.next(); assignedVolumes[p] += additionalFetchGroup.getVolume(); - TUtil.putCollectionToNestedList(fetchesArray[p], tableName, additionalFetchGroup.fetchUrls); + TUtil.putCollectionToNestedList(fetchesArray[p], tableName, additionalFetchGroup.getFetchProtos()); } p--; @@ -951,9 +963,9 @@ public class Repartitioner { public static void scheduleFetchesByEvenDistributedVolumes(Stage stage, Map<Integer, FetchGroupMeta> partitions, String tableName, int num) { - Map<String, List<FetchImpl>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond(); + Map<String, List<FetchProto>>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond(); // Schedule FetchImpls - for (Map<String, List<FetchImpl>> eachFetches : fetchsArray) { + for (Map<String, List<FetchProto>> eachFetches : fetchsArray) { Stage.scheduleFetches(stage, eachFetches); } } @@ -976,7 +988,7 @@ public class Repartitioner { throw new RuntimeException("tajo.dist-query.table-partition.task-volume-mb should be great than " + "tajo.shuffle.hash.appender.page.volumn-mb"); } - List<List<FetchImpl>> fetches = new ArrayList<>(); + List<List<FetchProto>> fetches = new ArrayList<>(); long totalIntermediateSize = 0L; for (Entry<ExecutionBlockId, List<IntermediateEntry>> listEntry : intermediates.entrySet()) { @@ -996,7 +1008,7 @@ public class Repartitioner { // Grouping or splitting to fit $DIST_QUERY_TABLE_PARTITION_VOLUME size for (List<IntermediateEntry> partitionEntries : partitionIntermMap.values()) { - List<List<FetchImpl>> eachFetches = splitOrMergeIntermediates(listEntry.getKey(), partitionEntries, + List<List<FetchProto>> eachFetches = splitOrMergeIntermediates(tableName, listEntry.getKey(), partitionEntries, splitVolume, pageSize); if (eachFetches != null && !eachFetches.isEmpty()) { fetches.addAll(eachFetches); @@ -1007,8 +1019,8 @@ public class Repartitioner { schedulerContext.setEstimatedTaskNum(fetches.size()); int i = 0; - Map<String, List<FetchImpl>>[] fetchesArray = new Map[fetches.size()]; - for(List<FetchImpl> entry : fetches) { + Map<String, List<FetchProto>>[] fetchesArray = new Map[fetches.size()]; + for(List<FetchProto> entry : fetches) { fetchesArray[i] = new HashMap<>(); fetchesArray[i].put(tableName, entry); @@ -1030,16 +1042,16 @@ public class Repartitioner { * @param splitVolume * @return */ - public static List<List<FetchImpl>> splitOrMergeIntermediates( + public static List<List<FetchProto>> splitOrMergeIntermediates(@NotNull String fetchName, ExecutionBlockId ebId, List<IntermediateEntry> entries, long splitVolume, long pageSize) { // Each List<FetchImpl> has splitVolume size. - List<List<FetchImpl>> fetches = new ArrayList<>(); + List<List<FetchProto>> fetches = new ArrayList<>(); Iterator<IntermediateEntry> iter = entries.iterator(); if (!iter.hasNext()) { return null; } - List<FetchImpl> fetchListForSingleTask = new ArrayList<>(); + List<FetchProto> fetchListForSingleTask = new ArrayList<>(); long fetchListVolume = 0; while (iter.hasNext()) { @@ -1065,11 +1077,11 @@ public class Repartitioner { fetchListForSingleTask = new ArrayList<>(); fetchListVolume = 0; } - FetchImpl fetch = new FetchImpl(currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE, + FetchImpl fetch = new FetchImpl(fetchName, currentInterm.getPullHost(), SCATTERED_HASH_SHUFFLE, ebId, currentInterm.getPartId(), TUtil.newList(currentInterm)); fetch.setOffset(eachSplit.getFirst()); fetch.setLength(eachSplit.getSecond()); - fetchListForSingleTask.add(fetch); + fetchListForSingleTask.add(fetch.getProto()); fetchListVolume += eachSplit.getSecond(); } } @@ -1079,19 +1091,56 @@ public class Repartitioner { return fetches; } - public static List<URI> createFetchURL(FetchImpl fetch, boolean includeParts) { + /** + * Get the pull server URIs. + */ + public static List<URI> createFullURIs(int maxUrlLength, FetchProto fetch) { + return createFetchURL(maxUrlLength, fetch, true); + } + + /** + * Get the pull server URIs without repeated parameters. + */ + public static List<URI> createSimpleURIs(int maxUrlLength, FetchProto fetch) { + return createFetchURL(maxUrlLength, fetch, false); + } + + private static String getRangeParam(FetchProto proto) { + StringBuilder sb = new StringBuilder(); + String firstKeyBase64 = new String(org.apache.commons.codec.binary.Base64.encodeBase64(proto.getRangeStart().toByteArray())); + String lastKeyBase64 = new String(org.apache.commons.codec.binary.Base64.encodeBase64(proto.getRangeEnd().toByteArray())); + + try { + sb.append("start=") + .append(URLEncoder.encode(firstKeyBase64, "utf-8")) + .append("&") + .append("end=") + .append(URLEncoder.encode(lastKeyBase64, "utf-8")); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + + if (proto.getRangeLastInclusive()) { + sb.append("&final=true"); + } + + return sb.toString(); + } + + public static List<URI> createFetchURL(int maxUrlLength, FetchProto fetch, boolean includeParts) { String scheme = "http://"; StringBuilder urlPrefix = new StringBuilder(scheme); - urlPrefix.append(fetch.getPullHost().getHost()).append(":").append(fetch.getPullHost().getPort()).append("/?") - .append("qid=").append(fetch.getExecutionBlockId().getQueryId().toString()) - .append("&sid=").append(fetch.getExecutionBlockId().getId()) + ExecutionBlockId ebId = new ExecutionBlockId(fetch.getExecutionBlockId()); + urlPrefix.append(fetch.getHost()).append(":").append(fetch.getPort()).append("/?") + .append("qid=").append(ebId.getQueryId().toString()) + .append("&sid=").append(ebId.getId()) .append("&p=").append(fetch.getPartitionId()) .append("&type="); if (fetch.getType() == HASH_SHUFFLE) { urlPrefix.append("h"); } else if (fetch.getType() == RANGE_SHUFFLE) { - urlPrefix.append("r").append("&").append(fetch.getRangeParams()); + urlPrefix.append("r").append("&").append(getRangeParam(fetch)); } else if (fetch.getType() == SCATTERED_HASH_SHUFFLE) { urlPrefix.append("s"); } @@ -1105,17 +1154,26 @@ public class Repartitioner { if (fetch.getType() == HASH_SHUFFLE || fetch.getType() == SCATTERED_HASH_SHUFFLE) { fetchURLs.add(URI.create(urlPrefix.toString())); } else { + urlPrefix.append("&ta="); // If the get request is longer than 2000 characters, // the long request uri may cause HTTP Status Code - 414 Request-URI Too Long. // Refer to http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.4.15 // The below code transforms a long request to multiple requests. List<String> taskIdsParams = new ArrayList<>(); StringBuilder taskIdListBuilder = new StringBuilder(); - List<Integer> taskIds = fetch.getTaskIds(); - List<Integer> attemptIds = fetch.getAttemptIds(); + + final List<Integer> taskIds = fetch.getTaskIdList(); + final List<Integer> attemptIds = fetch.getAttemptIdList(); + + // Sort task ids to increase cache hit in pull server + final List<Pair<Integer, Integer>> taskAndAttemptIds = IntStream.range(0, taskIds.size()) + .mapToObj(i -> new Pair<>(taskIds.get(i), attemptIds.get(i))) + .sorted((p1, p2) -> p1.getFirst() - p2.getFirst()) + .collect(Collectors.toList()); + boolean first = true; - for (int i = 0; i < taskIds.size(); i++) { + for (int i = 0; i < taskAndAttemptIds.size(); i++) { StringBuilder taskAttemptId = new StringBuilder(); if (!first) { // when comma is added? @@ -1124,17 +1182,16 @@ public class Repartitioner { first = false; } - int taskId = taskIds.get(i); + int taskId = taskAndAttemptIds.get(i).getFirst(); if (taskId < 0) { // In the case of hash shuffle each partition has single shuffle file per worker. // TODO If file is large, consider multiple fetching(shuffle file can be split) continue; } - int attemptId = attemptIds.get(i); + int attemptId = taskAndAttemptIds.get(i).getSecond(); taskAttemptId.append(taskId).append("_").append(attemptId); - if (taskIdListBuilder.length() + taskAttemptId.length() - > HTTP_REQUEST_MAXIMUM_LENGTH) { + if (urlPrefix.length() + taskIdListBuilder.length() > maxUrlLength) { taskIdsParams.add(taskIdListBuilder.toString()); taskIdListBuilder = new StringBuilder(taskId + "_" + attemptId); } else { @@ -1145,7 +1202,6 @@ public class Repartitioner { if (taskIdListBuilder.length() > 0) { taskIdsParams.add(taskIdListBuilder.toString()); } - urlPrefix.append("&ta="); for (String param : taskIdsParams) { fetchURLs.add(URI.create(urlPrefix + param)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 254df64..5f050bf 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -65,7 +65,6 @@ import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.RpcParameterFactory; import org.apache.tajo.util.history.StageHistory; import org.apache.tajo.util.history.TaskHistory; -import org.apache.tajo.worker.FetchImpl; import java.io.IOException; import java.net.InetSocketAddress; @@ -1189,7 +1188,7 @@ public class Stage implements EventHandler<StageEvent> { stage.getId(), leftFragment, rightFragments)); } - public static void scheduleFetches(Stage stage, Map<String, List<FetchImpl>> fetches) { + public static void scheduleFetches(Stage stage, Map<String, List<FetchProto>> fetches) { stage.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, stage.getId(), fetches)); } http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java index 95a7170..9d038ac 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java @@ -34,6 +34,7 @@ import org.apache.tajo.TajoProtos.TaskAttemptState; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.master.TaskState; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.event.*; @@ -46,7 +47,6 @@ import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.Pair; import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.util.history.TaskHistory; -import org.apache.tajo.worker.FetchImpl; import java.net.URI; import java.util.*; @@ -55,8 +55,8 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import static org.apache.tajo.ResourceProtos.*; +import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; public class Task implements EventHandler<TaskEvent> { /** Class Logger */ @@ -70,7 +70,7 @@ public class Task implements EventHandler<TaskEvent> { private List<ScanNode> scan; private Map<String, Set<FragmentProto>> fragMap; - private Map<String, Set<FetchImpl>> fetchMap; + private Map<String, Set<FetchProto>> fetchMap; private int totalFragmentNum; @@ -100,6 +100,8 @@ public class Task implements EventHandler<TaskEvent> { private TaskHistory finalTaskHistory; + private final int maxUrlLength; + protected static final StateMachineFactory <Task, TaskState, TaskEventType, TaskEvent> stateMachineFactory = new StateMachineFactory <Task, TaskState, TaskEventType, TaskEvent>(TaskState.NEW) @@ -207,6 +209,8 @@ public class Task implements EventHandler<TaskEvent> { stateMachine = stateMachineFactory.make(this); totalFragmentNum = 0; + maxUrlLength = conf.getInt(ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.name(), + ConfVars.PULLSERVER_FETCH_URL_MAX_LENGTH.defaultIntVal); } public boolean isLeafTask() { @@ -282,9 +286,9 @@ public class Task implements EventHandler<TaskEvent> { taskHistory.setFragments(fragmentList.toArray(new String[fragmentList.size()])); List<String[]> fetchList = new ArrayList<>(); - for (Map.Entry<String, Set<FetchImpl>> e : getFetchMap().entrySet()) { - for (FetchImpl f : e.getValue()) { - for (URI uri : f.getSimpleURIs()){ + for (Map.Entry<String, Set<FetchProto>> e : getFetchMap().entrySet()) { + for (FetchProto f : e.getValue()) { + for (URI uri : Repartitioner.createSimpleURIs(maxUrlLength, f)) { fetchList.add(new String[] {e.getKey(), uri.toString()}); } } @@ -364,8 +368,8 @@ public class Task implements EventHandler<TaskEvent> { return succeededWorker; } - public void addFetches(String tableId, Collection<FetchImpl> fetches) { - Set<FetchImpl> fetchSet; + public void addFetches(String tableId, Collection<FetchProto> fetches) { + Set<FetchProto> fetchSet; if (fetchMap.containsKey(tableId)) { fetchSet = fetchMap.get(tableId); } else { @@ -375,7 +379,7 @@ public class Task implements EventHandler<TaskEvent> { fetchMap.put(tableId, fetchSet); } - public void setFetches(Map<String, Set<FetchImpl>> fetches) { + public void setFetches(Map<String, Set<FetchProto>> fetches) { this.fetchMap.clear(); this.fetchMap.putAll(fetches); } @@ -395,27 +399,15 @@ public class Task implements EventHandler<TaskEvent> { public TaskId getId() { return taskId; } - - public Collection<FetchImpl> getFetchHosts(String tableId) { - return fetchMap.get(tableId); - } - - public Collection<Set<FetchImpl>> getFetches() { + + public Collection<Set<FetchProto>> getFetches() { return fetchMap.values(); } - public Map<String, Set<FetchImpl>> getFetchMap() { + public Map<String, Set<FetchProto>> getFetchMap() { return fetchMap; } - - public Collection<FetchImpl> getFetch(ScanNode scan) { - return this.fetchMap.get(scan.getTableName()); - } - - public ScanNode[] getScanNodes() { - return this.scan.toArray(new ScanNode[scan.size()]); - } - + @Override public String toString() { StringBuilder builder = new StringBuilder(); @@ -426,10 +418,10 @@ public class Task implements EventHandler<TaskEvent> { builder.append(fragment).append(", "); } } - for (Entry<String, Set<FetchImpl>> e : fetchMap.entrySet()) { + for (Entry<String, Set<FetchProto>> e : fetchMap.entrySet()) { builder.append(e.getKey()).append(" : "); - for (FetchImpl t : e.getValue()) { - for (URI uri : t.getURIs()){ + for (FetchProto t : e.getValue()) { + for (URI uri : Repartitioner.createFullURIs(maxUrlLength, t)){ builder.append(uri).append(" "); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 65cb6ac..098567a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -20,6 +20,12 @@ package org.apache.tajo.worker; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.*; +import io.netty.handler.codec.http.HttpHeaders.Names; +import io.netty.handler.codec.http.HttpHeaders.Values; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; @@ -32,20 +38,20 @@ import org.apache.tajo.TajoProtos; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryMasterProtocol; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.pullserver.TajoPullServerService; -import org.apache.tajo.rpc.AsyncRpcClient; -import org.apache.tajo.rpc.CallFuture; -import org.apache.tajo.rpc.NullCallback; -import org.apache.tajo.rpc.RpcClientManager; +import org.apache.tajo.rpc.*; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.storage.HashShuffleAppenderManager; import org.apache.tajo.util.Pair; import org.apache.tajo.worker.event.ExecutionBlockErrorEvent; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -191,10 +197,64 @@ public class ExecutionBlockContext { } tasks.clear(); taskHistories.clear(); + + // Clear index cache in pull server + clearIndexCache(); + resource.release(); RpcClientManager.cleanup(queryMasterClient); } + /** + * Send a request to {@link TajoPullServerService} to clear index cache + */ + private void clearIndexCache() { + // Avoid unnecessary cache clear request when the current eb is a leaf eb + if (executionBlockId.getId() > 1) { + Bootstrap bootstrap = new Bootstrap() + .group(NettyUtils.getSharedEventLoopGroup(NettyUtils.GROUP.FETCHER, 1)) + .channel(NioSocketChannel.class) + .option(ChannelOption.ALLOCATOR, NettyUtils.ALLOCATOR) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30 * 1000) + .option(ChannelOption.TCP_NODELAY, true); + ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>() { + @Override + protected void initChannel(Channel channel) throws Exception { + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast("codec", new HttpClientCodec()); + } + }; + bootstrap.handler(initializer); + + WorkerConnectionInfo connInfo = workerContext.getConnectionInfo(); + ChannelFuture future = bootstrap.connect(new InetSocketAddress(connInfo.getHost(), connInfo.getPullServerPort())) + .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + + try { + Channel channel = future.await().channel(); + if (!future.isSuccess()) { + // Upon failure to connect to pull server, cache clear message is just ignored. + LOG.warn(future.cause()); + return; + } + + // Example of URI: /ebid=eb_1450063997899_0015_000002 + ExecutionBlockId clearEbId = new ExecutionBlockId(executionBlockId.getQueryId(), executionBlockId.getId() - 1); + HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.DELETE, "ebid=" + clearEbId.toString()); + request.headers().set(Names.HOST, connInfo.getHost()); + request.headers().set(Names.CONNECTION, Values.CLOSE); + channel.writeAndFlush(request); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } finally { + if (future != null && future.channel().isOpen()) { + // Close the channel to exit. + future.channel().close(); + } + } + } + } + public TajoConf getConf() { return systemConf; } http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java index 7d2033c..b49d449 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java @@ -21,16 +21,21 @@ package org.apache.tajo.worker; import com.google.common.base.Objects; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.ResourceProtos.FetchProto; import org.apache.tajo.common.ProtoObject; -import org.apache.tajo.querymaster.Repartitioner; import org.apache.tajo.querymaster.Task; +import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; +import org.apache.tajo.storage.TupleRange; +import org.apache.tajo.util.Pair; import org.apache.tajo.util.TUtil; -import java.net.URI; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import static org.apache.tajo.plan.serder.PlanProto.ShuffleType; @@ -42,8 +47,8 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable { private ShuffleType type; // hash or range partition method. private ExecutionBlockId executionBlockId; // The executionBlock id private int partitionId; // The hash partition id - private String name; // The intermediate source name - private String rangeParams; // optional, the http parameters of range partition. (e.g., start=xx&end=yy) + private final String name; // The intermediate source name + private RangeParam rangeParam; // optional, range parameter for range shuffle private boolean hasNext = false; // optional, if true, has more taskIds private List<Integer> taskIds; // repeated, the task ids @@ -52,19 +57,48 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable { private long offset = -1; private long length = -1; - public FetchImpl() { - taskIds = new ArrayList<>(); - attemptIds = new ArrayList<>(); + public static class RangeParam { + private byte[] start; + private byte[] end; + private boolean lastInclusive; + + public RangeParam(TupleRange range, boolean lastInclusive, RowStoreEncoder encoder) { + this.start = encoder.toBytes(range.getStart()); + this.end = encoder.toBytes(range.getEnd()); + this.lastInclusive = lastInclusive; + } + + public RangeParam(byte[] start, byte[] end, boolean lastInclusive) { + this.start = start; + this.end = end; + this.lastInclusive = lastInclusive; + } + + public byte[] getStart() { + return start; + } + + public byte[] getEnd() { + return end; + } + + public boolean isLastInclusive() { + return lastInclusive; + } + + @Override + public int hashCode() { + return Objects.hashCode(Arrays.hashCode(start), Arrays.hashCode(end), lastInclusive); + } } public FetchImpl(FetchProto proto) { - this(new Task.PullHost(proto.getHost(), proto.getPort()), + this(proto.getName(), + new Task.PullHost(proto.getHost(), proto.getPort()), proto.getType(), new ExecutionBlockId(proto.getExecutionBlockId()), proto.getPartitionId(), - proto.getRangeParams(), proto.getHasNext(), - proto.getName(), proto.getTaskIdList(), proto.getAttemptIdList()); if (proto.hasOffset()) { @@ -74,31 +108,41 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable { if (proto.hasLength()) { this.length = proto.getLength(); } + + if (proto.hasRangeStart()) { + this.rangeParam = new RangeParam(proto.getRangeStart().toByteArray(), + proto.getRangeEnd().toByteArray(), proto.getRangeLastInclusive()); + } } - public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId, + public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId, int partitionId) { - this(host, type, executionBlockId, partitionId, null, false, null, - new ArrayList<>(), new ArrayList<>()); + this(name, host, type, executionBlockId, partitionId, null, false, new ArrayList<>(), new ArrayList<>()); } - public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId, + public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId, int partitionId, List<Task.IntermediateEntry> intermediateEntryList) { - this(host, type, executionBlockId, partitionId, null, false, null, + this(name, host, type, executionBlockId, partitionId, null, false, new ArrayList<>(), new ArrayList<>()); for (Task.IntermediateEntry entry : intermediateEntryList){ addPart(entry.getTaskId(), entry.getAttemptId()); } } - public FetchImpl(Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId, - int partitionId, String rangeParams, boolean hasNext, String name, + public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId, + int partitionId, boolean hasNext, + List<Integer> taskIds, List<Integer> attemptIds) { + this(name, host, type, executionBlockId, partitionId, null, hasNext, taskIds, attemptIds); + } + + public FetchImpl(String name, Task.PullHost host, ShuffleType type, ExecutionBlockId executionBlockId, + int partitionId, RangeParam rangeParam, boolean hasNext, List<Integer> taskIds, List<Integer> attemptIds) { this.host = host; this.type = type; this.executionBlockId = executionBlockId; this.partitionId = partitionId; - this.rangeParams = rangeParams; + this.rangeParam = rangeParam; this.hasNext = hasNext; this.name = name; this.taskIds = taskIds; @@ -107,7 +151,7 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable { @Override public int hashCode() { - return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParams, + return Objects.hashCode(host, type, executionBlockId, partitionId, name, rangeParam.hashCode(), hasNext, taskIds, attemptIds, offset, length); } @@ -123,11 +167,14 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable { builder.setHasNext(hasNext); builder.setName(name); - if (rangeParams != null && !rangeParams.isEmpty()) { - builder.setRangeParams(rangeParams); + if (rangeParam != null) { + builder.setRangeStart(ByteString.copyFrom(rangeParam.getStart())); + builder.setRangeEnd(ByteString.copyFrom(rangeParam.getEnd())); + builder.setRangeLastInclusive(rangeParam.isLastInclusive()); } Preconditions.checkArgument(taskIds.size() == attemptIds.size()); + builder.addAllTaskId(taskIds); builder.addAllAttemptId(attemptIds); @@ -141,10 +188,6 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable { this.attemptIds.add(attemptId); } - public Task.PullHost getPullHost() { - return this.host; - } - public ExecutionBlockId getExecutionBlockId() { return executionBlockId; } @@ -153,20 +196,8 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable { this.executionBlockId = executionBlockId; } - public int getPartitionId() { - return partitionId; - } - - public void setPartitionId(int partitionId) { - this.partitionId = partitionId; - } - - public String getRangeParams() { - return rangeParams; - } - - public void setRangeParams(String rangeParams) { - this.rangeParams = rangeParams; + public void setRangeParams(RangeParam rangeParams) { + this.rangeParam = rangeParams; } public boolean hasNext() { @@ -185,36 +216,10 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable { this.type = type; } - /** - * Get the pull server URIs. - */ - public List<URI> getURIs(){ - return Repartitioner.createFetchURL(this, true); - } - - /** - * Get the pull server URIs without repeated parameters. - */ - public List<URI> getSimpleURIs(){ - return Repartitioner.createFetchURL(this, false); - } - public String getName() { return name; } - public void setName(String name) { - this.name = name; - } - - public List<Integer> getTaskIds() { - return taskIds; - } - - public List<Integer> getAttemptIds() { - return attemptIds; - } - public long getOffset() { return offset; } @@ -238,8 +243,7 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable { newFetchImpl.type = type; newFetchImpl.executionBlockId = executionBlockId; newFetchImpl.partitionId = partitionId; - newFetchImpl.name = name; - newFetchImpl.rangeParams = rangeParams; + newFetchImpl.rangeParam = rangeParam; newFetchImpl.hasNext = hasNext; if (taskIds != null) { newFetchImpl.taskIds = Lists.newArrayList(taskIds); @@ -269,7 +273,7 @@ public class FetchImpl implements ProtoObject<FetchProto>, Cloneable { TUtil.checkEquals(executionBlockId, fetch.executionBlockId) && TUtil.checkEquals(host, fetch.host) && TUtil.checkEquals(name, fetch.name) && - TUtil.checkEquals(rangeParams, fetch.rangeParams) && + TUtil.checkEquals(rangeParam, fetch.rangeParam) && TUtil.checkEquals(taskIds, fetch.taskIds) && TUtil.checkEquals(type, fetch.type) && TUtil.checkEquals(offset, fetch.offset) && http://git-wip-us.apache.org/repos/asf/tajo/blob/1f9ae1da/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java index b5abffe..250b4cc 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Fetcher.java @@ -31,7 +31,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.IOUtils; import org.apache.tajo.TajoProtos; +import org.apache.tajo.TajoProtos.FetcherState; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.pullserver.TajoPullServerService; import org.apache.tajo.pullserver.retriever.FileChunk; import org.apache.tajo.rpc.NettyUtils; @@ -42,6 +44,8 @@ import java.io.RandomAccessFile; import java.net.InetSocketAddress; import java.net.URI; import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; /** @@ -67,6 +71,7 @@ public class Fetcher { private TajoProtos.FetcherState state; private Bootstrap bootstrap; + private List<Long> chunkLengths = new ArrayList<>(); public Fetcher(TajoConf conf, URI uri, FileChunk chunk) { this.uri = uri; @@ -97,9 +102,6 @@ public class Fetcher { conf.getIntVar(TajoConf.ConfVars.SHUFFLE_FETCHER_CONNECT_TIMEOUT) * 1000) .option(ChannelOption.SO_RCVBUF, 1048576) // set 1M .option(ChannelOption.TCP_NODELAY, true); - - ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile()); - bootstrap.handler(initializer); } } @@ -123,12 +125,20 @@ public class Fetcher { return messageReceiveCount; } - public FileChunk get() throws IOException { + public List<FileChunk> get() throws IOException { + List<FileChunk> fileChunks = new ArrayList<>(); if (useLocalFile) { startTime = System.currentTimeMillis(); finishTime = System.currentTimeMillis(); state = TajoProtos.FetcherState.FETCH_FINISHED; - return fileChunk; + fileChunks.add(fileChunk); + fileLen = fileChunk.getFile().length(); + return fileChunks; + } + + if (state == FetcherState.FETCH_INIT) { + ChannelInitializer<Channel> initializer = new HttpClientChannelInitializer(fileChunk.getFile()); + bootstrap.handler(initializer); } this.startTime = System.currentTimeMillis(); @@ -136,7 +146,7 @@ public class Fetcher { ChannelFuture future = null; try { future = bootstrap.clone().connect(new InetSocketAddress(host, port)) - .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); + .addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE); // Wait until the connection attempt succeeds or fails. Channel channel = future.awaitUninterruptibly().channel(); @@ -154,7 +164,7 @@ public class Fetcher { request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP); if(LOG.isDebugEnabled()) { - LOG.info("Status: " + getState() + ", URI:" + uri); + LOG.debug("Status: " + getState() + ", URI:" + uri); } // Send the HTTP request. channel.writeAndFlush(request); @@ -163,7 +173,18 @@ public class Fetcher { channel.closeFuture().syncUninterruptibly(); fileChunk.setLength(fileChunk.getFile().length()); - return fileChunk; + + long start = 0; + for (Long eachChunkLength : chunkLengths) { + if (eachChunkLength == 0) continue; + FileChunk chunk = new FileChunk(fileChunk.getFile(), start, eachChunkLength); + chunk.setEbId(fileChunk.getEbId()); + chunk.setFromRemote(fileChunk.fromRemote()); + fileChunks.add(chunk); + start += eachChunkLength; + } + return fileChunks; + } finally { if(future != null && future.channel().isOpen()){ // Close the channel to exit. @@ -226,6 +247,13 @@ public class Fetcher { } } } + if (response.headers().contains(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME)) { + String stringOffset = response.headers().get(TajoPullServerService.CHUNK_LENGTH_HEADER_NAME); + + for (String eachSplit : stringOffset.split(",")) { + chunkLengths.add(Long.parseLong(eachSplit)); + } + } } if (LOG.isDebugEnabled()) { LOG.debug(sb.toString()); @@ -296,6 +324,7 @@ public class Fetcher { if(getState() != TajoProtos.FetcherState.FETCH_FINISHED){ //channel is closed, but cannot complete fetcher finishTime = System.currentTimeMillis(); + LOG.error("Channel closed by peer: " + ctx.channel()); state = TajoProtos.FetcherState.FETCH_FAILED; } IOUtils.cleanup(LOG, fc, raf);
