http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index e69f61c..bc8495c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -31,11 +31,11 @@ import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.physical.EndpointAffinity; -import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.physical.base.Size; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS; @@ -357,28 +357,11 @@ public class ParquetGroupScan extends AbstractGroupScan { return columns; } - @Override - public OperatorCost getCost() { - // TODO Figure out how to properly calculate cost - return new OperatorCost(1, rowGroupInfos.size(), 1, 1); - } @Override - public Size getSize() { -// long totalSize = 0; -// for (RowGroupInfo rowGrpInfo : rowGroupInfos) { -// totalSize += rowGrpInfo.getTotalBytes(); -// } -// int rowSize = (int) (totalSize/rowCount); - - // if all the columns are required. - if (columns == null || columns.isEmpty()) { - return new Size(rowCount, columnValueCounts.size()); - } else { - // project pushdown : subset of columns are required. - return new Size(rowCount, columns.size()); - } - + public ScanStats getScanStats() { + int columnCount = columns == null ? 20 : columns.size(); + return new ScanStats(GroupScanProperty.EXACT_ROW_COUNT, rowCount, 1, rowCount * columnCount); } @Override @@ -412,11 +395,6 @@ public class ParquetGroupScan extends AbstractGroupScan { return true; } - @Override - public GroupScanProperty getProperty() { - return GroupScanProperty.EXACT_ROW_COUNT; - } - /** * Return column value count for the specified column. If does not contain such column, return 0. */
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java index 1f66e9f..d1a086c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java @@ -22,15 +22,12 @@ import java.util.LinkedList; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; -import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.AbstractBase; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; -import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; import org.apache.drill.exec.store.StoragePluginRegistry; @@ -102,16 +99,6 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan { } @Override - public OperatorCost getCost() { - return null; - } - - @Override - public Size getSize() { - return null; - } - - @Override public boolean isExecutable() { return false; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java index 67bab5e..75f0e74 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java @@ -22,7 +22,6 @@ import java.io.IOException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; -import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.AbstractWriter; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; @@ -90,12 +89,6 @@ public class ParquetWriter extends AbstractWriter { } @Override - public OperatorCost getCost() { - // TODO: - return new OperatorCost(1,1,1,1); - } - - @Override public int getOperatorType() { return CoreOperatorType.PARQUET_WRITER_VALUE; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java index 09aabb0..d260927 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableScan.java @@ -25,11 +25,11 @@ import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.PhysicalOperatorSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.EndpointAffinity; -import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; -import org.apache.drill.exec.physical.base.Size; +import org.apache.drill.exec.physical.base.ScanStats; +import org.apache.drill.exec.physical.base.ScanStats.GroupScanProperty; import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; @@ -62,14 +62,8 @@ public class SystemTableScan extends AbstractGroupScan implements SubScan{ this.plugin = plugin; } - @Override - public OperatorCost getCost() { - return new OperatorCost(1,1,1,1); - } - - @Override - public Size getSize() { - return new Size(100,1); + public ScanStats getScanStats(){ + return ScanStats.TRIVIAL_TABLE; } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 94e2cd4..3048f77 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -22,14 +22,16 @@ import io.netty.buffer.ByteBuf; import java.io.Closeable; import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeUnit; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.logical.LogicalPlan; import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.cache.DistributedCache.CacheConfig; import org.apache.drill.exec.cache.DistributedCache.SerializationMode; +import org.apache.drill.exec.coord.DistributedSemaphore; +import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.exception.OptimizerException; import org.apache.drill.exec.ops.QueryContext; @@ -74,6 +76,7 @@ import com.google.common.collect.Lists; public class Foreman implements Runnable, Closeable, Comparable<Object>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class); + public static final CacheConfig<FragmentHandle, PlanFragment> FRAGMENT_CACHE = CacheConfig // .newBuilder(FragmentHandle.class, PlanFragment.class) // .mode(SerializationMode.PROTOBUF) // @@ -86,12 +89,33 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ private WorkerBee bee; private UserClientConnection initiatingClient; private final AtomicState<QueryState> state; + private final DistributedSemaphore smallSemaphore; + private final DistributedSemaphore largeSemaphore; + private final long queueThreshold; + private final long queueTimeout; + private volatile DistributedLease lease; + private final boolean queuingEnabled; public Foreman(WorkerBee bee, DrillbitContext dContext, UserClientConnection connection, QueryId queryId, RunQuery queryRequest) { this.queryId = queryId; this.queryRequest = queryRequest; this.context = new QueryContext(connection.getSession(), queryId, dContext); + this.queuingEnabled = context.getOptions().getOption(ExecConstants.ENABLE_QUEUE_KEY).bool_val; + if(queuingEnabled){ + int smallQueue = context.getOptions().getOption(ExecConstants.SMALL_QUEUE_KEY).num_val.intValue(); + int largeQueue = context.getOptions().getOption(ExecConstants.LARGE_QUEUE_KEY).num_val.intValue(); + this.largeSemaphore = dContext.getClusterCoordinator().getSemaphore("query.large", largeQueue); + this.smallSemaphore = dContext.getClusterCoordinator().getSemaphore("query.small", smallQueue); + this.queueThreshold = context.getOptions().getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val; + this.queueTimeout = context.getOptions().getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val; + }else{ + this.largeSemaphore = null; + this.smallSemaphore = null; + this.queueThreshold = 0; + this.queueTimeout = 0; + } + this.initiatingClient = connection; this.fragmentManager = new QueryManager(queryId, queryRequest, bee.getContext().getPersistentStoreProvider(), new ForemanManagerListener(), dContext.getController(), this); this.bee = bee; @@ -194,10 +218,21 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ System.out.flush(); System.exit(-1); }finally{ + releaseLease(); Thread.currentThread().setName(originalThread); } } + private void releaseLease(){ + if(lease != null){ + try{ + lease.close(); + }catch(Exception e){ + logger.warn("Failure while releasing lease.", e); + }; + } + + } private void parseAndRunLogicalPlan(String json) { try { @@ -260,7 +295,6 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } } - private void parseAndRunPhysicalPlan(String json) { try { PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json); @@ -272,6 +306,9 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ private void runPhysicalPlan(PhysicalPlan plan) { + + + if(plan.getProperties().resultMode != ResultMode.EXEC){ fail(String.format("Failure running plan. You requested a result mode of %s and a physical plan can only be output as EXEC", plan.getProperties().resultMode), new Exception()); } @@ -287,12 +324,20 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ } PlanningSet planningSet = StatsCollector.collectStats(rootFragment); - SimpleParallelizer parallelizer = new SimpleParallelizer() - .setGlobalMaxWidth(context.getConfig().getInt(ExecConstants.GLOBAL_MAX_WIDTH)) - .setMaxWidthPerEndpoint(context.getConfig().getInt(ExecConstants.MAX_WIDTH_PER_ENDPOINT)) - .setAffinityFactor(context.getConfig().getDouble(ExecConstants.AFFINITY_FACTOR)); + SimpleParallelizer parallelizer = new SimpleParallelizer(context); try { + + double size = 0; + for(PhysicalOperator ops : plan.getSortedOperators()){ + size += ops.getCost(); + } + if(queuingEnabled && size > this.queueThreshold){ + this.lease = largeSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS); + }else{ + this.lease = smallSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS); + } + QueryWorkUnit work = parallelizer.getFragments(context.getOptions().getSessionOptionList(), context.getCurrentEndpoint(), queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, planningSet); @@ -324,7 +369,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ logger.debug("Fragments running."); state.updateState(QueryState.PENDING, QueryState.RUNNING); - } catch (ExecutionSetupException | RpcException e) { + } catch (Exception e) { fail("Failure while setting up query.", e); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index 562d7e8..7cfe51a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.util.TestTools; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ExecTest; import org.apache.drill.exec.client.DrillClient; import org.apache.drill.exec.client.PrintingResultsListener; @@ -99,8 +100,14 @@ public class BaseTestQuery extends ExecTest{ bit.run(); client = new DrillClient(config, serviceSet.getCoordinator()); client.connect(); + List<QueryResultBatch> results = client.runQuery(QueryType.SQL, String.format("alter session set `%s` = 2", ExecConstants.MAX_WIDTH_PER_NODE_KEY)); + for(QueryResultBatch b : results){ + b.release(); + } } + + protected BufferAllocator getAllocator(){ return allocator; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java index 1c5a7f9..b5b63bc 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java @@ -25,7 +25,7 @@ public class TestTpchDistributed extends BaseTestQuery{ private void testDistributed(String fileName) throws Exception{ String query = getFile(fileName); - test(query); + test("alter session set `planner.slice_target` = 10; " + query); } @Test http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/6f54ab01/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java index ea19351..f6972c3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java @@ -50,7 +50,7 @@ public class TestFragmentChecker extends PopUnitTestBase{ PhysicalPlanReader ppr = new PhysicalPlanReader(CONFIG, CONFIG.getMapper(), DrillbitEndpoint.getDefaultInstance()); Fragment fragmentRoot = getRootFragment(ppr, fragmentFile); PlanningSet planningSet = StatsCollector.collectStats(fragmentRoot); - SimpleParallelizer par = new SimpleParallelizer(); + SimpleParallelizer par = new SimpleParallelizer(1000*1000, 5, 10, 1.2); List<DrillbitEndpoint> endpoints = Lists.newArrayList(); DrillbitEndpoint localBit = null; for(int i =0; i < bitCount; i++){ @@ -59,7 +59,6 @@ public class TestFragmentChecker extends PopUnitTestBase{ endpoints.add(b1); } - par.setGlobalMaxWidth(10).setMaxWidthPerEndpoint(5); QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, ppr, fragmentRoot, planningSet); System.out.println(String.format("=========ROOT FRAGMENT [%d:%d] =========", qwu.getRootFragment().getHandle().getMajorFragmentId(), qwu.getRootFragment().getHandle().getMinorFragmentId()));
