DRILL-180: propagate exceptions back to client. fail all whenever any query without queryId is received. fix RunningFragmentManager and Foreman to include queryId when building results @Ignore ConstantRopTest.testRefInterp() until plan is updated.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0fc89a31 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0fc89a31 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0fc89a31 Branch: refs/heads/master Commit: 0fc89a317251e684c797ad0e5e5c68c3842ab3b3 Parents: 7edd361 Author: Steven Phillips <[email protected]> Authored: Thu Aug 22 16:53:19 2013 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Thu Aug 22 16:53:19 2013 -0700 ---------------------------------------------------------------------- .../drill/exec/rpc/RpcExceptionHandler.java | 2 +- .../drill/exec/rpc/user/QueryResultHandler.java | 10 ++++++++ .../exec/work/batch/BitComHandlerImpl.java | 9 +++++++ .../apache/drill/exec/work/foreman/Foreman.java | 1 + .../work/foreman/RunningFragmentManager.java | 5 +++- .../store/parquet/ParquetRecordReaderTest.java | 5 ++-- .../drill/exec/ref/rops/ConstantROPTest.java | 26 +++++++++----------- .../exec/ref/src/test/resources/constant2.json | 2 +- 8 files changed, 40 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java index a0aed94..0123cad 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcExceptionHandler.java @@ -30,7 +30,7 @@ public class RpcExceptionHandler implements ChannelHandler{ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if(!ctx.channel().isOpen()) return; - logger.info("Exception in pipeline. Closing channel between local " + ctx.channel().localAddress() + " and remote " + ctx.channel().remoteAddress(), cause); + logger.error("Exception in pipeline. Closing channel between local " + ctx.channel().localAddress() + " and remote " + ctx.channel().remoteAddress(), cause); ctx.close(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java index b2283a2..50f8c5a 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java @@ -63,6 +63,9 @@ public class QueryResultHandler { l = resultsListener.putIfAbsent(result.getQueryId(), bl); // if we had a succesful insert, use that reference. Otherwise, just throw away the new bufering listener. if (l == null) l = bl; + if (result.getQueryId().toString().equals("")) { + failAll(); + } } if(failed){ @@ -80,6 +83,13 @@ public class QueryResultHandler { resultsListener.remove(result.getQueryId(), l); } + + } + + private void failAll() { + for (UserResultsListener l : resultsListener.values()) { + l.submissionFailed(new RpcException("Received result without QueryId")); + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java index 5807c87..8cba493 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BitComHandlerImpl.java @@ -124,6 +124,15 @@ public class BitComHandlerImpl implements BitComHandler { listener.fail(fragment.getHandle(), "Failure while parsing fragment execution plan.", e); }catch(ExecutionSetupException e){ listener.fail(fragment.getHandle(), "Failure while setting up execution plan.", e); + } catch (Exception e) { + listener.fail(fragment.getHandle(), "Failure due to uncaught exception", e); + } catch (OutOfMemoryError t) { + if(t.getMessage().startsWith("Direct buffer")){ + listener.fail(fragment.getHandle(), "Failure due to error", t); + }else{ + throw t; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index c1fd9e5..bd64938 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -106,6 +106,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ .addError(error) // .setIsLastChunk(true) // .setQueryState(QueryState.FAILED) // + .setQueryId(queryId) // .build(); cleanupAndSendResult(result); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java index da2f7c1..9d9aca6 100644 --- a/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java +++ b/sandbox/prototype/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/RunningFragmentManager.java @@ -147,7 +147,10 @@ class RunningFragmentManager implements FragmentStatusListener{ updateStatus(status); int remaining = remainingFragmentCount.decrementAndGet(); if(remaining == 0){ - QueryResult result = QueryResult.newBuilder().setQueryState(QueryState.COMPLETED).build(); + QueryResult result = QueryResult.newBuilder() // + .setQueryState(QueryState.COMPLETED) // + .setQueryId(queryId) // + .build(); foreman.cleanupAndSendResult(result); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 7a99c3f..fc5bc81 100644 --- a/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/sandbox/prototype/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -103,17 +103,16 @@ public class ParquetRecordReaderTest { } @Test - @Ignore public void testLocalDistributed() throws Exception { String planName = "/parquet/parquet_scan_union_screen_physical.json"; - testParquetFullEngineLocalTextDistributed(planName, fileName, 1, 20, 300000); + testParquetFullEngineLocalTextDistributed(planName, fileName, 1, numberRowGroups, recordsPerRowGroup); } @Test @Ignore public void testRemoteDistributed() throws Exception { String planName = "/parquet/parquet_scan_union_screen_physical.json"; - testParquetFullEngineRemote(planName, fileName, 1, 10, 30000); + testParquetFullEngineRemote(planName, fileName, 1, numberRowGroups, recordsPerRowGroup); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/ConstantROPTest.java ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/ConstantROPTest.java b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/ConstantROPTest.java index 9aea930..353e66d 100644 --- a/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/ConstantROPTest.java +++ b/sandbox/prototype/exec/ref/src/test/java/org/apache/drill/exec/ref/rops/ConstantROPTest.java @@ -1,24 +1,25 @@ package org.apache.drill.exec.ref.rops; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Charsets; -import com.google.common.io.Files; +import java.util.Collection; + import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.LogicalPlan; import org.apache.drill.common.logical.data.Constant; import org.apache.drill.common.util.FileUtils; -import org.apache.drill.exec.ref.*; +import org.apache.drill.exec.ref.IteratorRegistry; +import org.apache.drill.exec.ref.RecordIterator; +import org.apache.drill.exec.ref.RecordPointer; +import org.apache.drill.exec.ref.ReferenceInterpreter; +import org.apache.drill.exec.ref.RunOutcome; import org.apache.drill.exec.ref.eval.BasicEvaluatorFactory; import org.apache.drill.exec.ref.rse.RSERegistry; import org.apache.drill.exec.ref.values.ScalarValues; +import org.junit.Ignore; import org.junit.Test; - -import java.io.File; -import java.util.Collection; -import java.util.Iterator; +import com.fasterxml.jackson.databind.ObjectMapper; /** * Created with IntelliJ IDEA. @@ -64,9 +65,8 @@ public class ConstantROPTest { // not sure if we want to keep this as a test and check the results. Now that the internals of the ConstantROP work // it might now be worth running the reference intepreter with every build @Test - public void testRefInterp(){ - - try{ + @Ignore // this plan needs to be updated. + public void testRefInterp() throws Exception{ DrillConfig config = DrillConfig.create(); final String jsonFile = "/constant2.json"; LogicalPlan plan = LogicalPlan.parse(config, FileUtils.getResourceAsString(jsonFile)); @@ -89,8 +89,6 @@ public class ConstantROPTest { outcome.exception.printStackTrace(); } } - } catch (Exception e) { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - } + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0fc89a31/sandbox/prototype/exec/ref/src/test/resources/constant2.json ---------------------------------------------------------------------- diff --git a/sandbox/prototype/exec/ref/src/test/resources/constant2.json b/sandbox/prototype/exec/ref/src/test/resources/constant2.json index 31fed5d..bad1aa3 100644 --- a/sandbox/prototype/exec/ref/src/test/resources/constant2.json +++ b/sandbox/prototype/exec/ref/src/test/resources/constant2.json @@ -1,6 +1,6 @@ { head:{ - type:"apache_drill_logical_plan", + type:"APACHE_DRILL_LOGICAL", version:"1", generator:{ type:"manual",
