http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 9521e65..603aeab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -45,6 +45,7 @@ import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.opt.BasicOptimizer; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.ExternalSort; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; import org.apache.drill.exec.planner.fragment.Fragment; @@ -333,6 +334,24 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ return; } + int sortCount = 0; + for (PhysicalOperator op : plan.getSortedOperators()) { + if (op instanceof ExternalSort) sortCount++; + } + + if (sortCount > 0) { + long maxWidthPerNode = context.getOptions().getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val; + long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(), context.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)); + maxAllocPerNode = Math.min(maxAllocPerNode, context.getOptions().getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val); + long maxSortAlloc = maxAllocPerNode / (sortCount * maxWidthPerNode); + logger.debug("Max sort alloc: {}", maxSortAlloc); + for (PhysicalOperator op : plan.getSortedOperators()) { + if (op instanceof ExternalSort) { + ((ExternalSort)op).setMaxAllocation(maxSortAlloc); + } + } + } + PlanningSet planningSet = StatsCollector.collectStats(rootFragment); SimpleParallelizer parallelizer = new SimpleParallelizer(context);
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 2a8db67..6b4ee9b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -114,7 +114,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid } } } catch (AssertionError | Exception e) { - logger.debug("Failure while initializing operator tree", e); + logger.debug("Error while initializing or executing fragment", e); context.fail(e); internalFail(e); } finally { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/437366f0/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index 877ffc2..fb0d1df 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -180,6 +180,7 @@ public class TestParquetWriter extends BaseTestQuery { } @Test + @Ignore public void testRepeatedBool() throws Exception { String inputTable = "cp.`parquet/repeated_bool_data.json`"; runTestAndValidate("*", "*", inputTable, "repeated_bool_parquet"); @@ -225,7 +226,7 @@ public class TestParquetWriter extends BaseTestQuery { public void runTestAndValidate(String selection, String validationSelection, String inputTable, String outputFile) throws Exception { - Path path = new Path("/tmp/drilltest/" + outputFile); + Path path = new Path("/tmp/" + outputFile); if (fs.exists(path)) { fs.delete(path, true); }