Updated Branches: refs/heads/master d17f48315 -> dc2e17308
DRILL-164: Add QueryId return in UserResultListener so that REST api can use for external query management. Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/dc2e1730 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/dc2e1730 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/dc2e1730 Branch: refs/heads/master Commit: dc2e17308e1be686e3422320c26249cbc8728047 Parents: d17f483 Author: Jacques Nadeau <[email protected]> Authored: Mon Nov 11 10:54:04 2013 -0800 Committer: Jacques Nadeau <[email protected]> Committed: Mon Nov 11 10:54:04 2013 -0800 ---------------------------------------------------------------------- .../java/org/apache/drill/exec/client/DrillClient.java | 5 +++++ .../org/apache/drill/exec/client/QuerySubmitter.java | 5 +++++ .../apache/drill/exec/rpc/user/QueryResultHandler.java | 6 ++++++ .../apache/drill/exec/rpc/user/UserResultsListener.java | 7 ++----- .../drill/exec/store/ParquetRecordReaderTest.java | 12 +++++++----- .../drill/exec/store/TestParquetPhysicalPlan.java | 6 ++++++ .../exec/store/parquet/ParquetRecordReaderTest.java | 5 +++++ .../org/apache/drill/sql/client/full/BatchListener.java | 5 +++++ 8 files changed, 41 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dc2e1730/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 4d6088d..8ee9042 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -37,6 +37,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.ZKClusterCoordinator; import org.apache.drill.exec.memory.DirectBufferAllocator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserProtos; import org.apache.drill.exec.proto.UserProtos.QueryType; import org.apache.drill.exec.rpc.*; @@ -231,6 +232,10 @@ public class DrillClient implements Closeable{ throw RpcException.mapException(t); } } + + @Override + public void queryIdArrived(QueryId queryId) { + } } private class FutureHandler extends AbstractCheckedFuture<Void, RpcException> implements RpcConnectionHandler<ServerConnection>, DrillRpcFuture<Void>{ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dc2e1730/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java index 554fad0..7adefdb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/QuerySubmitter.java @@ -31,6 +31,7 @@ import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.ZKClusterCoordinator; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserProtos; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorWrapper; @@ -173,5 +174,9 @@ public class QuerySubmitter { latch.await(); return count.get(); } + + @Override + public void queryIdArrived(QueryId queryId) { + } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dc2e1730/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java index e1e1930..5b4a504 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java @@ -147,6 +147,11 @@ public class QueryResultHandler { return finished; } + + @Override + public void queryIdArrived(QueryId queryId) { + } + } private class SubmissionListener extends BaseRpcOutcomeListener<QueryId> { @@ -164,6 +169,7 @@ public class QueryResultHandler { @Override public void success(QueryId queryId, ByteBuf buf) { + listener.queryIdArrived(queryId); logger.debug("Received QueryId {} succesfully. Adding listener {}", queryId, listener); UserResultsListener oldListener = resultsListener.putIfAbsent(queryId, listener); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dc2e1730/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java index f077151..3bcd0cf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserResultsListener.java @@ -17,15 +17,12 @@ */ package org.apache.drill.exec.rpc.user; -import java.util.concurrent.Future; - -import org.apache.drill.exec.proto.UserProtos.QueryResult; +import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.rpc.RpcException; -import com.google.common.util.concurrent.SettableFuture; - public interface UserResultsListener { + public abstract void queryIdArrived(QueryId queryId); public abstract void submissionFailed(RpcException ex); public abstract void resultArrived(QueryResultBatch result); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dc2e1730/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java index 38a624c..733cb1d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/ParquetRecordReaderTest.java @@ -20,15 +20,15 @@ package org.apache.drill.exec.store; import com.beust.jcommander.internal.Lists; import com.google.common.base.Charsets; import com.google.common.io.Files; - import com.google.common.util.concurrent.SettableFuture; + import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.util.FileUtils; import org.apache.drill.exec.client.DrillClient; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.physical.impl.OutputMutator; - +import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserProtos; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatchLoader; @@ -38,7 +38,6 @@ import org.apache.drill.exec.rpc.user.QueryResultBatch; import org.apache.drill.exec.rpc.user.UserResultsListener; import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.RemoteServiceSet; - import org.apache.drill.exec.store.json.JsonSchemaProvider; import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.ValueVector; @@ -47,9 +46,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Ignore; import org.junit.Test; + import parquet.bytes.BytesInput; import parquet.column.ColumnDescriptor; - import parquet.hadoop.ParquetFileWriter; import parquet.hadoop.metadata.CompressionCodecName; import parquet.schema.MessageType; @@ -58,7 +57,6 @@ import parquet.schema.MessageTypeParser; import java.util.*; import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; import static parquet.column.Encoding.PLAIN; public class ParquetRecordReaderTest { @@ -454,6 +452,10 @@ public class ParquetRecordReaderTest { throw RpcException.mapException(t); } } + + @Override + public void queryIdArrived(QueryId queryId) { + } } // specific tests should call this method, but it is not marked as a test itself intentionally http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dc2e1730/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java index bdbccf5..b765ed0 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/TestParquetPhysicalPlan.java @@ -20,8 +20,10 @@ package org.apache.drill.exec.store; import com.google.common.base.Charsets; import com.google.common.base.Stopwatch; import com.google.common.io.Resources; + import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserProtos; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorWrapper; @@ -103,6 +105,10 @@ public class TestParquetPhysicalPlan { latch.await(); return count.get(); } + + @Override + public void queryIdArrived(QueryId queryId) { + } } @Test @Ignore http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dc2e1730/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 1397bd7..ab29a9f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -35,6 +35,7 @@ import org.apache.drill.common.util.FileUtils; import org.apache.drill.exec.client.DrillClient; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserProtos; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatchLoader; @@ -234,6 +235,10 @@ public class ParquetRecordReaderTest { throw RpcException.mapException(t); } } + + @Override + public void queryIdArrived(QueryId queryId) { + } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/dc2e1730/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java ---------------------------------------------------------------------- diff --git a/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java b/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java index 566b502..30a31b2 100644 --- a/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java +++ b/sqlparser/src/main/java/org/apache/drill/sql/client/full/BatchListener.java @@ -21,6 +21,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.user.QueryResultBatch; import org.apache.drill.exec.rpc.user.UserResultsListener; @@ -70,5 +71,9 @@ public class BatchListener implements UserResultsListener { } } + @Override + public void queryIdArrived(QueryId queryId) { + } + }
