TAJO-1383: Improve broadcast table cache. (jinho) closes #404
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/e1e38e23 Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/e1e38e23 Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/e1e38e23 Branch: refs/heads/index_support Commit: e1e38e231867e4f6f953a7ec41f5f9d5ad242580 Parents: 7f05695 Author: Jinho Kim <[email protected]> Authored: Fri Mar 13 16:55:25 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Fri Mar 13 16:55:25 2015 +0900 ---------------------------------------------------------------------- CHANGES | 2 + .../engine/planner/physical/HashJoinExec.java | 51 ++++++-- .../planner/physical/HashLeftOuterJoinExec.java | 74 ++++++++++- .../physical/PartitionMergeScanExec.java | 17 ++- .../tajo/engine/planner/physical/ScanExec.java | 72 +++++++++++ .../engine/planner/physical/SeqScanExec.java | 105 +++------------- .../apache/tajo/engine/utils/CacheHolder.java | 97 +++++++++++++++ .../apache/tajo/engine/utils/TableCache.java | 84 +++++++++++++ .../apache/tajo/engine/utils/TableCacheKey.java | 57 +++++++++ .../apache/tajo/engine/utils/TupleCache.java | 122 ------------------- .../apache/tajo/engine/utils/TupleCacheKey.java | 57 --------- .../worker/ExecutionBlockSharedResource.java | 26 ++++ .../apache/tajo/worker/TaskAttemptContext.java | 2 +- .../apache/tajo/worker/TaskRunnerManager.java | 4 +- .../apache/tajo/engine/util/TestTableCache.java | 109 +++++++++++++++++ .../apache/tajo/engine/util/TestTupleCache.java | 89 -------------- .../plan/serder/LogicalNodeDeserializer.java | 3 + .../tajo/plan/serder/LogicalNodeSerializer.java | 2 + tajo-plan/src/main/proto/Plan.proto | 1 + 19 files changed, 599 insertions(+), 375 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index e6f7917..84a7571 100644 --- a/CHANGES +++ b/CHANGES @@ -9,6 +9,8 @@ Release 0.11.0 - unreleased IMPROVEMENT + TAJO-1383: Improve broadcast table cache. (jinho) + TAJO-1374: Support multi-bytes delimiter for CSV file. (Contributed by navis, Committed by jinho) http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java index d475b78..3bdf2d4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashJoinExec.java @@ -19,15 +19,18 @@ package org.apache.tajo.engine.planner.physical; import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.engine.planner.Projector; -import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.catalog.SchemaUtil; +import org.apache.tajo.engine.utils.CacheHolder; +import org.apache.tajo.engine.utils.TableCacheKey; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.JoinNode; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.worker.ExecutionBlockSharedResource; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -58,13 +61,14 @@ public class HashJoinExec extends BinaryPhysicalExec { // projection protected final Projector projector; + private TableStats cachedRightTableStats; + public HashJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftExec, PhysicalExec rightExec) { super(context, SchemaUtil.merge(leftExec.getSchema(), rightExec.getSchema()), plan.getOutSchema(), leftExec, rightExec); this.plan = plan; this.joinQual = plan.getJoinQual(); - this.tupleSlots = new HashMap<Tuple, List<Tuple>>(100000); // HashJoin only can manage equi join key pairs. this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftExec.getSchema(), @@ -151,8 +155,41 @@ public class HashJoinExec extends BinaryPhysicalExec { } protected void loadRightToHashTable() throws IOException { + ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class); + if (scanExec.canBroadcast()) { + /* If this table can broadcast, all tasks in a node will share the same cache */ + TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey( + context, scanExec.getCanonicalName(), scanExec.getFragments()); + loadRightFromCache(key); + } else { + this.tupleSlots = buildRightToHashTable(); + } + + first = false; + } + + protected void loadRightFromCache(TableCacheKey key) throws IOException { + ExecutionBlockSharedResource sharedResource = context.getSharedResource(); + synchronized (sharedResource.getLock()) { + if (sharedResource.hasBroadcastCache(key)) { + CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key); + this.tupleSlots = data.getData(); + this.cachedRightTableStats = data.getTableStats(); + } else { + CacheHolder.BroadcastCacheHolder holder = + new CacheHolder.BroadcastCacheHolder(buildRightToHashTable(), rightChild.getInputStats(), null); + sharedResource.addBroadcastCache(key, holder); + CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key); + this.tupleSlots = data.getData(); + this.cachedRightTableStats = data.getTableStats(); + } + } + } + + private Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException { Tuple tuple; Tuple keyTuple; + Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000); while (!context.isStopped() && (tuple = rightChild.next()) != null) { keyTuple = new VTuple(joinKeyPairs.size()); @@ -160,18 +197,18 @@ public class HashJoinExec extends BinaryPhysicalExec { keyTuple.put(i, tuple.get(rightKeyList[i])); } - List<Tuple> newValue = tupleSlots.get(keyTuple); + List<Tuple> newValue = map.get(keyTuple); if (newValue != null) { newValue.add(tuple); } else { newValue = new ArrayList<Tuple>(); newValue.add(tuple); - tupleSlots.put(keyTuple, newValue); + map.put(keyTuple, newValue); } } - first = false; + return map; } @Override @@ -219,7 +256,7 @@ public class HashJoinExec extends BinaryPhysicalExec { inputStats.setNumRows(leftInputStats.getNumRows()); } - TableStats rightInputStats = rightChild.getInputStats(); + TableStats rightInputStats = cachedRightTableStats == null ? rightChild.getInputStats() : cachedRightTableStats; if (rightInputStats != null) { inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes()); inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes()); http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java index 233ef92..e78cb20 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashLeftOuterJoinExec.java @@ -22,7 +22,10 @@ import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.engine.planner.Projector; +import org.apache.tajo.engine.utils.CacheHolder; +import org.apache.tajo.engine.utils.TableCacheKey; import org.apache.tajo.engine.utils.TupleUtil; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.catalog.SchemaUtil; @@ -33,6 +36,7 @@ import org.apache.tajo.plan.logical.JoinNode; import org.apache.tajo.storage.FrameTuple; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.worker.ExecutionBlockSharedResource; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; @@ -66,6 +70,7 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec { protected Projector projector; private int rightNumCols; + private TableStats cachedRightTableStats; private static final Log LOG = LogFactory.getLog(HashLeftOuterJoinExec.class); public HashLeftOuterJoinExec(TaskAttemptContext context, JoinNode plan, PhysicalExec leftChild, @@ -91,8 +96,6 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec { this.joinFilter = null; } - this.tupleSlots = new HashMap<Tuple, List<Tuple>>(10000); - // HashJoin only can manage equi join key pairs. this.joinKeyPairs = PlannerUtil.getJoinKeyPairs(joinQual, leftChild.getSchema(), rightChild.getSchema(), false); @@ -201,8 +204,41 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec { } protected void loadRightToHashTable() throws IOException { + ScanExec scanExec = PhysicalPlanUtil.findExecutor(rightChild, ScanExec.class); + if (scanExec.canBroadcast()) { + /* If this table can broadcast, all tasks in a node will share the same cache */ + TableCacheKey key = CacheHolder.BroadcastCacheHolder.getCacheKey( + context, scanExec.getCanonicalName(), scanExec.getFragments()); + loadRightFromCache(key); + } else { + this.tupleSlots = buildRightToHashTable(); + } + + first = false; + } + + protected void loadRightFromCache(TableCacheKey key) throws IOException { + ExecutionBlockSharedResource sharedResource = context.getSharedResource(); + synchronized (sharedResource.getLock()) { + if (sharedResource.hasBroadcastCache(key)) { + CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key); + this.tupleSlots = data.getData(); + this.cachedRightTableStats = data.getTableStats(); + } else { + CacheHolder.BroadcastCacheHolder holder = + new CacheHolder.BroadcastCacheHolder(buildRightToHashTable(), rightChild.getInputStats(), null); + sharedResource.addBroadcastCache(key, holder); + CacheHolder<Map<Tuple, List<Tuple>>> data = sharedResource.getBroadcastCache(key); + this.tupleSlots = data.getData(); + this.cachedRightTableStats = data.getTableStats(); + } + } + } + + private Map<Tuple, List<Tuple>> buildRightToHashTable() throws IOException { Tuple tuple; Tuple keyTuple; + Map<Tuple, List<Tuple>> map = new HashMap<Tuple, List<Tuple>>(100000); while (!context.isStopped() && (tuple = rightChild.next()) != null) { keyTuple = new VTuple(joinKeyPairs.size()); @@ -210,16 +246,18 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec { keyTuple.put(i, tuple.get(rightKeyList[i])); } - List<Tuple> newValue = tupleSlots.get(keyTuple); + List<Tuple> newValue = map.get(keyTuple); + if (newValue != null) { newValue.add(tuple); } else { newValue = new ArrayList<Tuple>(); newValue.add(tuple); - tupleSlots.put(keyTuple, newValue); + map.put(keyTuple, newValue); } } - first = false; + + return map; } @Override @@ -250,5 +288,31 @@ public class HashLeftOuterJoinExec extends BinaryPhysicalExec { public JoinNode getPlan() { return this.plan; } + + @Override + public TableStats getInputStats() { + if (leftChild == null) { + return inputStats; + } + TableStats leftInputStats = leftChild.getInputStats(); + inputStats.setNumBytes(0); + inputStats.setReadBytes(0); + inputStats.setNumRows(0); + + if (leftInputStats != null) { + inputStats.setNumBytes(leftInputStats.getNumBytes()); + inputStats.setReadBytes(leftInputStats.getReadBytes()); + inputStats.setNumRows(leftInputStats.getNumRows()); + } + + TableStats rightInputStats = cachedRightTableStats == null ? rightChild.getInputStats() : cachedRightTableStats; + if (rightInputStats != null) { + inputStats.setNumBytes(inputStats.getNumBytes() + rightInputStats.getNumBytes()); + inputStats.setReadBytes(inputStats.getReadBytes() + rightInputStats.getReadBytes()); + inputStats.setNumRows(inputStats.getNumRows() + rightInputStats.getNumRows()); + } + + return inputStats; + } } http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java index 5692308..a1eaa48 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java @@ -34,7 +34,7 @@ import java.util.List; /** * A Scanner that reads multiple partitions */ -public class PartitionMergeScanExec extends PhysicalExec { +public class PartitionMergeScanExec extends ScanExec { private final ScanNode plan; private SeqScanExec currentScanner = null; @@ -56,14 +56,16 @@ public class PartitionMergeScanExec extends PhysicalExec { inputStats = new TableStats(); } + @Override public void init() throws IOException { for (CatalogProtos.FragmentProto fragment : fragments) { SeqScanExec scanExec = new SeqScanExec(context, (ScanNode) PlannerUtil.clone(null, plan), - new CatalogProtos.FragmentProto[] {fragment}); + new CatalogProtos.FragmentProto[]{fragment}); scanners.add(scanExec); } progress = 0.0f; rescan(); + super.init(); } @Override @@ -112,11 +114,22 @@ public class PartitionMergeScanExec extends PhysicalExec { progress = 1.0f; } + @Override public String getTableName() { return plan.getTableName(); } @Override + public String getCanonicalName() { + return plan.getCanonicalName(); + } + + @Override + public CatalogProtos.FragmentProto[] getFragments() { + return fragments; + } + + @Override public float getProgress() { if (iterator != null) { float progressSum = 0.0f; http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java new file mode 100644 index 0000000..86874ba --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ScanExec.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner.physical; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.engine.planner.enforce.Enforcer; +import org.apache.tajo.ipc.TajoWorkerProtocol; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; + +public abstract class ScanExec extends PhysicalExec { + + /* if this is a broadcasted table or not */ + private boolean canBroadcast; + + public ScanExec(TaskAttemptContext context, Schema inSchema, Schema outSchema) { + super(context, inSchema, outSchema); + } + + public abstract String getTableName(); + + public abstract String getCanonicalName(); + + public abstract CatalogProtos.FragmentProto[] getFragments(); + + @Override + public void init() throws IOException { + canBroadcast = checkIfBroadcast(); + + super.init(); + } + + public boolean canBroadcast() { + return canBroadcast; + } + + /* check if this scan is broadcasted */ + private boolean checkIfBroadcast() { + Enforcer enforcer = context.getEnforcer(); + + if (enforcer != null && enforcer.hasEnforceProperty(TajoWorkerProtocol.EnforceProperty.EnforceType.BROADCAST)) { + List<TajoWorkerProtocol.EnforceProperty> properties = + enforcer.getEnforceProperties(TajoWorkerProtocol.EnforceProperty.EnforceType.BROADCAST); + + for (TajoWorkerProtocol.EnforceProperty property : properties) { + if (getCanonicalName().equals(property.getBroadcast().getTableName())) { + return true; + } + } + } + return false; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index c62027d..1078c80 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -21,17 +21,14 @@ package org.apache.tajo.engine.planner.physical; import org.apache.hadoop.io.IOUtils; import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.datum.Datum; import org.apache.tajo.engine.codegen.CompilationError; import org.apache.tajo.engine.planner.Projector; -import org.apache.tajo.engine.utils.TupleCache; -import org.apache.tajo.engine.utils.TupleCacheKey; -import org.apache.tajo.catalog.SchemaUtil; import org.apache.tajo.plan.Target; import org.apache.tajo.plan.expr.ConstEval; import org.apache.tajo.plan.expr.EvalNode; @@ -42,18 +39,16 @@ import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; -import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; -public class SeqScanExec extends PhysicalExec { +public class SeqScanExec extends ScanExec { private ScanNode plan; private Scanner scanner = null; @@ -66,10 +61,6 @@ public class SeqScanExec extends PhysicalExec { private TableStats inputStats; - private TupleCacheKey cacheKey; - - private boolean cacheRead = false; - public SeqScanExec(TaskAttemptContext context, ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema()); @@ -78,21 +69,6 @@ public class SeqScanExec extends PhysicalExec { this.qual = plan.getQual(); this.fragments = fragments; - if (plan.isBroadcastTable()) { - String pathNameKey = ""; - if (fragments != null) { - StringBuilder stringBuilder = new StringBuilder(); - for (FragmentProto f : fragments) { - Fragment fragement = FragmentConvertor.convert(context.getConf(), f); - stringBuilder.append(fragement.getKey()); - } - pathNameKey = stringBuilder.toString(); - } - - cacheKey = new TupleCacheKey( - context.getTaskId().getTaskId().getExecutionBlockId().toString(), plan.getTableName(), pathNameKey); - } - if (fragments != null && plan.getTableDesc().hasPartition() && plan.getTableDesc().getPartitionMethod().getPartitionType() == CatalogProtos.PartitionType.COLUMN) { @@ -153,6 +129,7 @@ public class SeqScanExec extends PhysicalExec { } } + @Override public void init() throws IOException { Schema projected; @@ -177,33 +154,7 @@ public class SeqScanExec extends PhysicalExec { projected = outSchema; } - if (cacheKey != null) { - TupleCache tupleCache = TupleCache.getInstance(); - if (tupleCache.isBroadcastCacheReady(cacheKey)) { - openCacheScanner(); - } else { - if (TupleCache.getInstance().lockBroadcastScan(cacheKey)) { - scanAndAddCache(projected); - openCacheScanner(); - } else { - Object lockMonitor = tupleCache.getLockMonitor(); - synchronized (lockMonitor) { - try { - lockMonitor.wait(20 * 1000); - } catch (InterruptedException e) { - } - } - if (tupleCache.isBroadcastCacheReady(cacheKey)) { - openCacheScanner(); - } else { - initScanner(projected); - } - } - } - } else { - initScanner(projected); - } - + initScanner(projected); super.init(); } @@ -216,7 +167,7 @@ public class SeqScanExec extends PhysicalExec { private void initScanner(Schema projected) throws IOException { this.projector = new Projector(context, inSchema, outSchema, plan.getTargets()); - TableMeta meta = null; + TableMeta meta; try { meta = (TableMeta) plan.getTableDesc().getMeta().clone(); } catch (CloneNotSupportedException e) { @@ -241,35 +192,6 @@ public class SeqScanExec extends PhysicalExec { } } - private void openCacheScanner() throws IOException { - Scanner cacheScanner = TupleCache.getInstance().openCacheScanner(cacheKey, plan.getPhysicalSchema()); - if (cacheScanner != null) { - scanner = cacheScanner; - cacheRead = true; - } - } - - private void scanAndAddCache(Schema projected) throws IOException { - initScanner(projected); - - List<Tuple> broadcastTupleCacheList = new ArrayList<Tuple>(); - while (!context.isStopped()) { - Tuple tuple = next(); - if (tuple != null) { - broadcastTupleCacheList.add(tuple); - } else { - break; - } - } - - if (scanner != null) { - scanner.close(); - scanner = null; - } - - TupleCache.getInstance().addBroadcastCache(cacheKey, broadcastTupleCacheList); - } - @Override public Tuple next() throws IOException { if (fragments == null) { @@ -281,9 +203,6 @@ public class SeqScanExec extends PhysicalExec { if (!plan.hasQual()) { if ((tuple = scanner.next()) != null) { - if (cacheRead) { - return tuple; - } projector.eval(tuple, outTuple); outTuple.setOffset(tuple.getOffset()); return outTuple; @@ -292,9 +211,6 @@ public class SeqScanExec extends PhysicalExec { } } else { while ((tuple = scanner.next()) != null) { - if (cacheRead) { - return tuple; - } if (qual.eval(inSchema, tuple).isTrue()) { projector.eval(tuple, outTuple); return outTuple; @@ -328,11 +244,22 @@ public class SeqScanExec extends PhysicalExec { projector = null; } + @Override public String getTableName() { return plan.getTableName(); } @Override + public String getCanonicalName() { + return plan.getCanonicalName(); + } + + @Override + public CatalogProtos.FragmentProto[] getFragments() { + return fragments; + } + + @Override public float getProgress() { if (scanner == null) { return 1.0f; http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java new file mode 100644 index 0000000..6a5c0bf --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/CacheHolder.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.utils; + +import com.google.common.collect.Maps; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.fragment.FragmentConvertor; +import org.apache.tajo.util.Deallocatable; +import org.apache.tajo.worker.TaskAttemptContext; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public interface CacheHolder<T> { + + /** + * Get a shared data from the TableCache. + */ + T getData(); + + /** + * Get a shared table stats from the TableCache. + */ + TableStats getTableStats(); + + /** + * Release a cache to the memory. + * + */ + void release(); + + /** + * This is a cache-holder for a join table + * It will release when execution block is finished + */ + public static class BroadcastCacheHolder implements CacheHolder<Map<Tuple, List<Tuple>>> { + private Map<Tuple, List<Tuple>> data; + private Deallocatable rowBlock; + private TableStats tableStats; + + public BroadcastCacheHolder(Map<Tuple, List<Tuple>> data, TableStats tableStats, Deallocatable rowBlock){ + this.data = data; + this.tableStats = tableStats; + this.rowBlock = rowBlock; + } + + @Override + public Map<Tuple, List<Tuple>> getData() { + return Maps.newHashMap(data); + } + + @Override + public TableStats getTableStats(){ + return tableStats; + } + + @Override + public void release() { + if(rowBlock != null) rowBlock.release(); + } + + public static TableCacheKey getCacheKey(TaskAttemptContext ctx, String canonicalName, + CatalogProtos.FragmentProto[] fragments) throws IOException { + String pathNameKey = ""; + if (fragments != null) { + StringBuilder stringBuilder = new StringBuilder(); + for (CatalogProtos.FragmentProto f : fragments) { + Fragment fragement = FragmentConvertor.convert(ctx.getConf(), f); + stringBuilder.append(fragement.getKey()); + } + pathNameKey = stringBuilder.toString(); + } + + return new TableCacheKey(ctx.getTaskId().getTaskId().getExecutionBlockId().toString(), canonicalName, pathNameKey); + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java new file mode 100644 index 0000000..f2a2217 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCache.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.utils; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.ExecutionBlockId; + +import java.util.List; +import java.util.Map; + +/** + * This is a simple TableCache which just added CacheHolder as needed. + */ + public class TableCache { + public static final Log LOG = LogFactory.getLog(TableCache.class); + + private static TableCache instance; + private Map<TableCacheKey, CacheHolder<?>> cacheMap = Maps.newHashMap(); + + private TableCache() { + } + + public static synchronized TableCache getInstance() { + if (instance == null) { + instance = new TableCache(); + } + return instance; + } + + public synchronized void releaseCache(ExecutionBlockId ebId) { + if (ebId == null) { + return; + } + + List<TableCacheKey> keys = getCacheKeyByExecutionBlockId(ebId); + + for (TableCacheKey cacheKey: keys) { + cacheMap.remove(cacheKey).release(); + LOG.info("Removed Broadcast Table Cache: " + cacheKey.getTableName() + " EbId: " + cacheKey.ebId); + } + } + + public synchronized List<TableCacheKey> getCacheKeyByExecutionBlockId(ExecutionBlockId ebId) { + List<TableCacheKey> keys = Lists.newArrayList(); + for (TableCacheKey eachKey : cacheMap.keySet()) { + if (eachKey.ebId.equals(ebId.toString())) { + keys.add(eachKey); + } + } + return keys; + } + + public synchronized void addCache(TableCacheKey cacheKey, CacheHolder<?> cacheData) { + cacheMap.put(cacheKey, cacheData); + LOG.info("Added Broadcast Table Cache: " + cacheKey.getTableName() + " EbId: " + cacheKey.ebId); + } + + public synchronized boolean hasCache(TableCacheKey cacheKey) { + return cacheMap.containsKey(cacheKey); + } + + public synchronized CacheHolder<?> getCache(TableCacheKey cacheKey) { + return cacheMap.get(cacheKey); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCacheKey.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCacheKey.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCacheKey.java new file mode 100644 index 0000000..81a4b58 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TableCacheKey.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.utils; + +public class TableCacheKey { + String ebId; + String tableName; + String pathName; + + public TableCacheKey(String ebId, String tableName, String pathName) { + this.ebId = ebId; + this.tableName = tableName; + this.pathName = pathName; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + @Override + public int hashCode() { + return toString().hashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + return toString().equals(o.toString()); + } + + @Override + public String toString() { + return ebId + "," + tableName + "," + pathName; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java deleted file mode 100644 index 00647b5..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCache.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.engine.utils; - -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class TupleCache { - private static TupleCache instance; - - private Map<TupleCacheKey, List<Tuple>> broadcastTupleCacheData - = new HashMap<TupleCacheKey, List<Tuple>>(); - private Map<TupleCacheKey, TupleCacheStatus> broadcastTupleCacheStatus - = new HashMap<TupleCacheKey, TupleCacheStatus>(); - - private Object lockMonitor = new Object(); - - public static enum TupleCacheStatus { - STARTED, - ENDED - }; - - private TupleCache() { - } - - public static synchronized TupleCache getInstance() { - if (instance == null) { - instance = new TupleCache(); - } - return instance; - } - - public Object getLockMonitor() { - return lockMonitor; - } - - public void removeBroadcastCache(ExecutionBlockId ebId) { - if (ebId == null) { - return; - } - synchronized (lockMonitor) { - TupleCacheKey matchedKey = null; - for (TupleCacheKey eachKey: broadcastTupleCacheStatus.keySet()) { - if (eachKey.ebId.equals(ebId.toString())) { - matchedKey = eachKey; - break; - } - } - if (matchedKey != null) { - broadcastTupleCacheStatus.remove(matchedKey); - broadcastTupleCacheData.remove(matchedKey); - } - } - } - - public void addBroadcastCache(TupleCacheKey cacheKey, List<Tuple> cacheData) { - synchronized (lockMonitor) { - if (broadcastTupleCacheStatus.containsKey(cacheKey) && - broadcastTupleCacheStatus.get(cacheKey) == TupleCacheStatus.ENDED) { - return; - } - broadcastTupleCacheData.put(cacheKey, cacheData); - broadcastTupleCacheStatus.put(cacheKey, TupleCacheStatus.ENDED); - lockMonitor.notifyAll(); - } - } - - public boolean lockBroadcastScan(TupleCacheKey cacheKey) { - synchronized (lockMonitor) { - if (broadcastTupleCacheStatus.containsKey(cacheKey)) { - return false; - } else { - broadcastTupleCacheStatus.put(cacheKey, TupleCacheStatus.STARTED); - return true; - } - } - } - - public boolean isBroadcastCacheReady(TupleCacheKey cacheKey) { - synchronized (lockMonitor) { - if (!broadcastTupleCacheStatus.containsKey(cacheKey)) { - return false; - } - return broadcastTupleCacheStatus.get(cacheKey) == TupleCacheStatus.ENDED; - } - } - - public TupleCacheScanner openCacheScanner(TupleCacheKey cacheKey, Schema schema) throws IOException { - synchronized (lockMonitor) { - List<Tuple> cacheData = broadcastTupleCacheData.get(cacheKey); - if (cacheData != null) { - TupleCacheScanner scanner = new TupleCacheScanner(cacheData, schema); - scanner.init(); - return scanner; - } else { - return null; - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java deleted file mode 100644 index 1cb01c2..0000000 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleCacheKey.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.engine.utils; - -public class TupleCacheKey { - String ebId; - String tableName; - String pathName; - - public TupleCacheKey(String ebId, String tableName, String pathName) { - this.ebId = ebId; - this.tableName = tableName; - this.pathName = pathName; - } - - public String getTableName() { - return tableName; - } - - public void setTableName(String tableName) { - this.tableName = tableName; - } - - @Override - public int hashCode() { - return toString().hashCode(); - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - return toString().equals(o.toString()); - } - - @Override - public String toString() { - return ebId + "," + tableName + "," + pathName; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java index b193b24..494fd7f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockSharedResource.java @@ -21,12 +21,16 @@ package org.apache.tajo.worker; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.Schema; import org.apache.tajo.engine.codegen.ExecutorPreCompiler; import org.apache.tajo.engine.codegen.TajoClassLoader; import org.apache.tajo.engine.json.CoreGsonHelper; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.engine.utils.CacheHolder; +import org.apache.tajo.engine.utils.TableCache; +import org.apache.tajo.engine.utils.TableCacheKey; import org.apache.tajo.plan.PlanningException; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.LogicalNode; @@ -38,6 +42,7 @@ public class ExecutionBlockSharedResource { private static Log LOG = LogFactory.getLog(ExecutionBlockSharedResource.class); private AtomicBoolean initializing = new AtomicBoolean(false); private volatile Boolean resourceInitSuccess = Boolean.valueOf(false); + private final Object lock = new Object(); // Query private QueryContext context; @@ -108,6 +113,27 @@ public class ExecutionBlockSharedResource { } } + /* This is guarantee a lock for a ExecutionBlock */ + public synchronized Object getLock() { + return lock; + } + + public boolean hasBroadcastCache(TableCacheKey key) { + return TableCache.getInstance().hasCache(key); + } + + public <T extends Object> CacheHolder<T> getBroadcastCache(TableCacheKey key) { + return (CacheHolder<T>) TableCache.getInstance().getCache(key); + } + + public void addBroadcastCache(TableCacheKey cacheKey, CacheHolder<?> cacheData) { + TableCache.getInstance().addCache(cacheKey, cacheData); + } + + public void releaseBroadcastCache(ExecutionBlockId id) { + TableCache.getInstance().releaseCache(id); + } + public void release() { compilationContext = null; http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 50cd20a..706e9b8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -24,8 +24,8 @@ import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; -import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TajoProtos.TaskAttemptState; +import org.apache.tajo.TaskAttemptId; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java index 3f4a1b8..a375a31 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java @@ -20,7 +20,6 @@ package org.apache.tajo.worker; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -30,7 +29,6 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.TaskAttemptId; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.utils.TupleCache; import org.apache.tajo.worker.event.TaskRunnerEvent; import org.apache.tajo.worker.event.TaskRunnerStartEvent; import org.apache.tajo.worker.event.TaskRunnerStopEvent; @@ -184,7 +182,7 @@ public class TaskRunnerManager extends CompositeService implements EventHandler< ExecutionBlockContext executionBlockContext = executionBlockContextMap.remove(event.getExecutionBlockId()); if(executionBlockContext != null){ try { - TupleCache.getInstance().removeBroadcastCache(event.getExecutionBlockId()); + executionBlockContext.getSharedResource().releaseBroadcastCache(event.getExecutionBlockId()); executionBlockContext.reportExecutionBlock(event.getExecutionBlockId()); workerContext.getHashShuffleAppenderManager().close(event.getExecutionBlockId()); workerContext.getTaskHistoryWriter().flushTaskHistories(); http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTableCache.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTableCache.java b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTableCache.java new file mode 100644 index 0000000..f10f2a1 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTableCache.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.util; + +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryIdFactory; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.engine.utils.CacheHolder; +import org.apache.tajo.engine.utils.TableCache; +import org.apache.tajo.engine.utils.TableCacheKey; +import org.apache.tajo.worker.ExecutionBlockSharedResource; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +public class TestTableCache { + + @Test + public void testBroadcastTableCache() throws Exception { + + ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId( + QueryIdFactory.newQueryId(System.currentTimeMillis(), 0)); + + final TableCacheKey key = new TableCacheKey(ebId.toString(), "testBroadcastTableCache", "path"); + final ExecutionBlockSharedResource resource = new ExecutionBlockSharedResource(); + + final int parallelCount = 30; + ExecutorService executor = Executors.newFixedThreadPool(parallelCount); + List<Future<CacheHolder<Long>>> tasks = new ArrayList<Future<CacheHolder<Long>>>(); + for (int i = 0; i < parallelCount; i++) { + tasks.add(executor.submit(createTask(key, resource))); + } + + long expected = tasks.get(0).get().getData().longValue(); + + for (Future<CacheHolder<Long>> future : tasks) { + assertEquals(expected, future.get().getData().longValue()); + } + + resource.releaseBroadcastCache(ebId); + assertFalse(resource.hasBroadcastCache(key)); + executor.shutdown(); + } + + private Callable<CacheHolder<Long>> createTask(final TableCacheKey key, final ExecutionBlockSharedResource resource) { + return new Callable<CacheHolder<Long>>() { + @Override + public CacheHolder<Long> call() throws Exception { + CacheHolder<Long> result; + synchronized (resource.getLock()) { + if (!TableCache.getInstance().hasCache(key)) { + final long nanoTime = System.nanoTime(); + final TableStats tableStats = new TableStats(); + tableStats.setNumRows(100); + tableStats.setNumBytes(1000); + + final CacheHolder<Long> cacheHolder = new CacheHolder<Long>() { + + @Override + public Long getData() { + return nanoTime; + } + + @Override + public TableStats getTableStats() { + return tableStats; + } + + @Override + public void release() { + + } + }; + + resource.addBroadcastCache(key, cacheHolder); + } + } + + CacheHolder<?> holder = resource.getBroadcastCache(key); + result = (CacheHolder<Long>) holder; + return result; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java b/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java deleted file mode 100644 index 3d2f307..0000000 --- a/tajo-core/src/test/java/org/apache/tajo/engine/util/TestTupleCache.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.engine.util; - -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.TextDatum; -import org.apache.tajo.engine.utils.TupleCache; -import org.apache.tajo.engine.utils.TupleCacheKey; -import org.apache.tajo.storage.Scanner; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; - -import static junit.framework.TestCase.assertEquals; -import static junit.framework.TestCase.assertFalse; -import static junit.framework.TestCase.assertTrue; -import static org.junit.Assert.assertNotNull; - -public class TestTupleCache { - @Test - public void testTupleCcaheBasicFunction() throws Exception { - List<Tuple> tupleData = new ArrayList<Tuple>(); - for (int i = 0; i < 100; i++) { - Datum[] datums = new Datum[5]; - for (int j = 0; j < 5; j++) { - datums[j] = new TextDatum(i + "_" + j); - } - Tuple tuple = new VTuple(datums); - tupleData.add(tuple); - } - - ExecutionBlockId ebId = QueryIdFactory.newExecutionBlockId( - QueryIdFactory.newQueryId(System.currentTimeMillis(), 0)); - - TupleCacheKey cacheKey = new TupleCacheKey(ebId.toString(), "TestTable", "test"); - TupleCache tupleCache = TupleCache.getInstance(); - - assertFalse(tupleCache.isBroadcastCacheReady(cacheKey)); - assertTrue(tupleCache.lockBroadcastScan(cacheKey)); - assertFalse(tupleCache.lockBroadcastScan(cacheKey)); - - tupleCache.addBroadcastCache(cacheKey, tupleData); - assertTrue(tupleCache.isBroadcastCacheReady(cacheKey)); - - Scanner scanner = tupleCache.openCacheScanner(cacheKey, null); - assertNotNull(scanner); - - int count = 0; - - while (true) { - Tuple tuple = scanner.next(); - if (tuple == null) { - break; - } - - assertEquals(tupleData.get(count), tuple); - count++; - } - - assertEquals(tupleData.size(), count); - - tupleCache.removeBroadcastCache(ebId); - assertFalse(tupleCache.isBroadcastCacheReady(cacheKey)); - assertTrue(tupleCache.lockBroadcastScan(cacheKey)); - - tupleCache.removeBroadcastCache(ebId); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java index 5cbed7e..3387157 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java @@ -416,6 +416,9 @@ public class LogicalNodeDeserializer { scan.setQual(EvalNodeDeserializer.deserialize(context, scanProto.getQual())); } + if(scanProto.hasBroadcast()){ + scan.setBroadcastTable(scanProto.getBroadcast()); + } scan.setInSchema(convertSchema(protoNode.getInSchema())); scan.setOutSchema(convertSchema(protoNode.getOutSchema())); } http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java index 39a13ba..1bde955 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java @@ -431,6 +431,8 @@ public class LogicalNodeSerializer extends BasicLogicalPlanVisitor<LogicalNodeSe if (scan.hasQual()) { scanBuilder.setQual(EvalNodeSerializer.serialize(scan.getQual())); } + + scanBuilder.setBroadcast(scan.isBroadcastTable()); return scanBuilder; } http://git-wip-us.apache.org/repos/asf/tajo/blob/e1e38e23/tajo-plan/src/main/proto/Plan.proto ---------------------------------------------------------------------- diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto index 3e4f07c..02f52ff 100644 --- a/tajo-plan/src/main/proto/Plan.proto +++ b/tajo-plan/src/main/proto/Plan.proto @@ -104,6 +104,7 @@ message ScanNode { required bool existTargets = 3; repeated Target targets = 4; optional EvalNodeTree qual = 5; + optional bool broadcast = 6; } message PartitionScanSpec {
