Repository: drill Updated Branches: refs/heads/master d7e37f45a -> 49042bca0
DRILL-2613: 1-Hygiene/prep.: Split DrillResultSet impl. vs. intf.; renaming. Code "hygiene": - Renamed parameter index to rowOffset, and documented it, in SqlAccessor. - Renamed index to rowOffset in AbstractSqlAccessor and BoundCheckingAccessor too. - Fixed a cast to DrillResultSet to be a ResultSet.unwrap(...) call. Preparation: Split DrillResultSet into interface vs. implementation: - Moved old implementation class org.apache.drill.jdbc.DrillResultSet to new implementation class org.apache.drill.jdbc.impl.DrillResultSetImpl. - Created new interface org.apache.drill.jdbc.DrillResultSet, declaring method getQueryId(...). - Relatedly, in nearby JDBC code: - Renamed various references. - Added some "public" qualifiers. Most are only for the interim until DRILL-2089 (moving other implementation classes from org.apache.drill.jdbc to org.apache.drill.jdbc.impl) is completed sufficiently. (See marking with "DRILL-2089".)n - (Files: old DrillResultSet, DrillResultSetImpl, new DrillResultSet; DrillConnectionImpl, DrillCursor, DrillJdbc41Factory, MetaImpl; JdbcTestQueryBase.) Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/690ffa9c Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/690ffa9c Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/690ffa9c Branch: refs/heads/master Commit: 690ffa9cf142d4c2f15a99434c0eb04c0a55ba64 Parents: d7e37f4 Author: dbarclay <dbarc...@maprtech.com> Authored: Wed Apr 8 23:26:18 2015 -0700 Committer: Parth Chandra <pchan...@maprtech.com> Committed: Mon Apr 13 09:36:37 2015 -0700 ---------------------------------------------------------------------- .../vector/accessor/AbstractSqlAccessor.java | 36 +-- .../vector/accessor/BoundCheckingAccessor.java | 74 +++--- .../drill/exec/vector/accessor/SqlAccessor.java | 41 +-- .../apache/drill/jdbc/DrillConnectionImpl.java | 6 +- .../java/org/apache/drill/jdbc/DrillCursor.java | 9 +- .../apache/drill/jdbc/DrillJdbc41Factory.java | 6 +- .../org/apache/drill/jdbc/DrillResultSet.java | 242 ++--------------- .../java/org/apache/drill/jdbc/MetaImpl.java | 4 +- .../drill/jdbc/impl/DrillResultSetImpl.java | 263 +++++++++++++++++++ .../drill/jdbc/test/JdbcTestQueryBase.java | 3 +- 10 files changed, 379 insertions(+), 305 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/690ffa9c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java index 1605c7d..f7ebfb8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/AbstractSqlAccessor.java @@ -30,85 +30,85 @@ import org.apache.drill.common.types.TypeProtos.MajorType; abstract class AbstractSqlAccessor implements SqlAccessor { @Override - public abstract boolean isNull(int index); + public abstract boolean isNull(int rowOffset); @Override - public BigDecimal getBigDecimal(int index) throws InvalidAccessException{ + public BigDecimal getBigDecimal(int rowOffset) throws InvalidAccessException{ throw new InvalidAccessException("BigDecimal"); } @Override - public boolean getBoolean(int index) throws InvalidAccessException{ + public boolean getBoolean(int rowOffset) throws InvalidAccessException{ throw new InvalidAccessException("boolean"); } @Override - public byte getByte(int index) throws InvalidAccessException{ + public byte getByte(int rowOffset) throws InvalidAccessException{ throw new InvalidAccessException("byte"); } @Override - public byte[] getBytes(int index) throws InvalidAccessException{ + public byte[] getBytes(int rowOffset) throws InvalidAccessException{ throw new InvalidAccessException("byte[]"); } @Override - public Date getDate(int index) throws InvalidAccessException{ + public Date getDate(int rowOffset) throws InvalidAccessException{ throw new InvalidAccessException("Date"); } @Override - public double getDouble(int index) throws InvalidAccessException{ + public double getDouble(int rowOffset) throws InvalidAccessException{ throw new InvalidAccessException("double"); } @Override - public float getFloat(int index) throws InvalidAccessException{ + public float getFloat(int rowOffset) throws InvalidAccessException{ throw new InvalidAccessException("float"); } @Override - public int getInt(int index) throws InvalidAccessException{ + public int getInt(int rowOffset) throws InvalidAccessException{ throw new InvalidAccessException("int"); } @Override - public long getLong(int index) throws InvalidAccessException{ + public long getLong(int rowOffset) throws InvalidAccessException{ throw new InvalidAccessException("long"); } @Override - public short getShort(int index) throws InvalidAccessException{ + public short getShort(int rowOffset) throws InvalidAccessException{ throw new InvalidAccessException("short"); } @Override - public InputStream getStream(int index) throws InvalidAccessException{ + public InputStream getStream(int rowOffset) throws InvalidAccessException{ throw new InvalidAccessException("InputStream"); } @Override - public char getChar(int index) throws InvalidAccessException{ + public char getChar(int rowOffset) throws InvalidAccessException{ throw new InvalidAccessException("Char"); } @Override - public Reader getReader(int index) throws InvalidAccessException{ + public Reader getReader(int rowOffset) throws InvalidAccessException{ throw new InvalidAccessException("Reader"); } @Override - public String getString(int index) throws InvalidAccessException{ - return getObject(index).toString(); + public String getString(int rowOffset) throws InvalidAccessException{ + return getObject(rowOffset).toString(); } @Override - public Time getTime(int index) throws InvalidAccessException{ + public Time getTime(int rowOffset) throws InvalidAccessException{ throw new InvalidAccessException("Time"); } @Override - public Timestamp getTimestamp(int index) throws InvalidAccessException{ + public Timestamp getTimestamp(int rowOffset) throws InvalidAccessException{ throw new InvalidAccessException("Timestamp"); } http://git-wip-us.apache.org/repos/asf/drill/blob/690ffa9c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/BoundCheckingAccessor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/BoundCheckingAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/BoundCheckingAccessor.java index c8d6cc7..2838d3a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/BoundCheckingAccessor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/BoundCheckingAccessor.java @@ -39,88 +39,88 @@ public class BoundCheckingAccessor implements SqlAccessor { } @Override - public boolean isNull(int index) { - return delegate.isNull(index); + public boolean isNull(int rowOffset) { + return delegate.isNull(rowOffset); } @Override - public BigDecimal getBigDecimal(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getBigDecimal(index); + public BigDecimal getBigDecimal(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getBigDecimal(rowOffset); } @Override - public boolean getBoolean(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getBoolean(index); + public boolean getBoolean(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getBoolean(rowOffset); } @Override - public byte getByte(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getByte(index); + public byte getByte(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getByte(rowOffset); } @Override - public byte[] getBytes(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getBytes(index); + public byte[] getBytes(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getBytes(rowOffset); } @Override - public Date getDate(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getDate(index); + public Date getDate(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getDate(rowOffset); } @Override - public double getDouble(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getDouble(index); + public double getDouble(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getDouble(rowOffset); } @Override - public float getFloat(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getFloat(index); + public float getFloat(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getFloat(rowOffset); } @Override - public char getChar(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getChar(index); + public char getChar(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getChar(rowOffset); } @Override - public int getInt(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getInt(index); + public int getInt(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getInt(rowOffset); } @Override - public long getLong(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getLong(index); + public long getLong(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getLong(rowOffset); } @Override - public short getShort(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getShort(index); + public short getShort(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getShort(rowOffset); } @Override - public InputStream getStream(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getStream(index); + public InputStream getStream(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getStream(rowOffset); } @Override - public Reader getReader(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getReader(index); + public Reader getReader(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getReader(rowOffset); } @Override - public String getString(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getString(index); + public String getString(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getString(rowOffset); } @Override - public Time getTime(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getTime(index); + public Time getTime(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getTime(rowOffset); } @Override - public Timestamp getTimestamp(int index) throws AbstractSqlAccessor.InvalidAccessException { - return delegate.getTimestamp(index); + public Timestamp getTimestamp(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { + return delegate.getTimestamp(rowOffset); } /** @@ -129,12 +129,12 @@ public class BoundCheckingAccessor implements SqlAccessor { * @see org.apache.drill.exec.vector.accessor.SqlAccessor#getObject(int) */ @Override - public Object getObject(int index) throws AbstractSqlAccessor.InvalidAccessException { + public Object getObject(int rowOffset) throws AbstractSqlAccessor.InvalidAccessException { // In case some vectors have fewer values than others, and callee invokes // this method with index >= getValueCount(), this should still yield null. final ValueVector.Accessor accessor = vector.getAccessor(); - if (index < accessor.getValueCount()) { - return delegate.getObject(index); + if (rowOffset < accessor.getValueCount()) { + return delegate.getObject(rowOffset); } return null; } http://git-wip-us.apache.org/repos/asf/drill/blob/690ffa9c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java index 6007bf4..db10e64 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/accessor/SqlAccessor.java @@ -28,7 +28,6 @@ import org.apache.drill.exec.vector.accessor.AbstractSqlAccessor.InvalidAccessEx // TODO: Doc.: Document more of basics of pattern of contracts for getXxx(...): -// - What index is (especially since is not 1-based JDBC/SQL column index). // - What constitutes invalid access (that throws InvalidAccessException): // - Does it include out-of-bound index values? (The lack of "throws // InvalidAccessException" on isNull(...) suggests no, but ...) @@ -36,6 +35,10 @@ import org.apache.drill.exec.vector.accessor.AbstractSqlAccessor.InvalidAccessEx /** * Column-data accessor that implements JDBC's Java-null--when--SQL-NULL mapping. * <p> + * Each {@code rowOffset} parameter specifies the (zero-based) offset (in rows) + * of the requested value. + * </p> + * <p> * When the requested value is logically a SQL NULL: * </p> * <li> @@ -56,60 +59,60 @@ public interface SqlAccessor { /** * Reports whether the logical value is a SQL NULL. */ - boolean isNull(int index); + boolean isNull(int rowOffset); /** (See {@link SqlAccessor class description}.) */ - BigDecimal getBigDecimal(int index) throws InvalidAccessException; + BigDecimal getBigDecimal(int rowOffset) throws InvalidAccessException; /** (See {@link SqlAccessor class description}.) */ - boolean getBoolean(int index) throws InvalidAccessException; + boolean getBoolean(int rowOffset) throws InvalidAccessException; /** (See {@link SqlAccessor class description}.) */ - byte getByte(int index) throws InvalidAccessException; + byte getByte(int rowOffset) throws InvalidAccessException; /** (See {@link SqlAccessor class description}.) */ - byte[] getBytes(int index) throws InvalidAccessException; + byte[] getBytes(int rowOffset) throws InvalidAccessException; /** (See {@link SqlAccessor class description}.) */ - Date getDate(int index) throws InvalidAccessException; + Date getDate(int rowOffset) throws InvalidAccessException; /** (See {@link SqlAccessor class description}.) */ - double getDouble(int index) throws InvalidAccessException; + double getDouble(int rowOffset) throws InvalidAccessException; /** (See {@link SqlAccessor class description}.) */ - float getFloat(int index) throws InvalidAccessException; + float getFloat(int rowOffset) throws InvalidAccessException; /** (See {@link SqlAccessor class description}.) */ - char getChar(int index) throws InvalidAccessException; + char getChar(int rowOffset) throws InvalidAccessException; /** (See {@link SqlAccessor class description}.) */ - int getInt(int index) throws InvalidAccessException; + int getInt(int rowOffset) throws InvalidAccessException; /** (See {@link SqlAccessor class description}.) */ - long getLong(int index) throws InvalidAccessException; + long getLong(int rowOffset) throws InvalidAccessException; /** (See {@link SqlAccessor class description}.) */ - short getShort(int index) throws InvalidAccessException; + short getShort(int rowOffset) throws InvalidAccessException; /** (See {@link SqlAccessor class description}.) */ - InputStream getStream(int index) throws InvalidAccessException; + InputStream getStream(int rowOffset) throws InvalidAccessException; /** (See {@link SqlAccessor class description}.) */ - Reader getReader(int index) throws InvalidAccessException; + Reader getReader(int rowOffset) throws InvalidAccessException; // TODO: Doc./Spec.: What should happen if called on non-string type? (Most // are convertible to string. Does that result in error or conversion?) // Similar question for many other methods. /** (See {@link SqlAccessor class description}.) */ - String getString(int index) throws InvalidAccessException; + String getString(int rowOffset) throws InvalidAccessException; /** (See {@link SqlAccessor class description}.) */ - Time getTime(int index) throws InvalidAccessException; + Time getTime(int rowOffset) throws InvalidAccessException; /** (See {@link SqlAccessor class description}.) */ - Timestamp getTimestamp(int index) throws InvalidAccessException; + Timestamp getTimestamp(int rowOffset) throws InvalidAccessException; /** (See {@link SqlAccessor class description}.) */ - Object getObject(int index) throws InvalidAccessException; + Object getObject(int rowOffset) throws InvalidAccessException; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/690ffa9c/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java index b18cb75..3fdbf84 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java @@ -45,6 +45,7 @@ import org.apache.drill.exec.store.dfs.FileSystemPlugin; import org.apache.drill.exec.store.dfs.WorkspaceConfig; import org.apache.drill.exec.util.TestUtilities; +// (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) /** * Implementation of JDBC connection in Drill. * @@ -52,7 +53,7 @@ import org.apache.drill.exec.util.TestUtilities; * Abstract to allow newer versions of JDBC to add methods. * </p> */ -abstract class DrillConnectionImpl extends AvaticaConnection implements DrillConnection { +public abstract class DrillConnectionImpl extends AvaticaConnection implements DrillConnection { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConnection.class); final DrillStatementRegistry openStatementsRegistry = new DrillStatementRegistry(); @@ -181,8 +182,9 @@ abstract class DrillConnectionImpl extends AvaticaConnection implements DrillCon return config.getTimeZone(); } + // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) // do not make public - UnregisteredDriver getDriver() { + public UnregisteredDriver getDriver() { return driver; } http://git-wip-us.apache.org/repos/asf/drill/blob/690ffa9c/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java index 3b38a09..d666d06 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillCursor.java @@ -30,6 +30,7 @@ import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.user.QueryDataBatch; +import org.apache.drill.jdbc.impl.DrillResultSetImpl; public class DrillCursor implements Cursor { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillCursor.class); @@ -37,10 +38,10 @@ public class DrillCursor implements Cursor { private static final String UNKNOWN = "--UNKNOWN--"; /** The associated java.sql.ResultSet implementation. */ - private final DrillResultSet resultSet; + private final DrillResultSetImpl resultSet; private final RecordBatchLoader currentBatch; - private final DrillResultSet.ResultsListener resultsListener; + private final DrillResultSetImpl.ResultsListener resultsListener; // TODO: Doc.: Say what's started (set of rows? just current result batch?) private boolean started = false; @@ -63,13 +64,13 @@ public class DrillCursor implements Cursor { * * @param resultSet the associated ResultSet implementation */ - public DrillCursor(final DrillResultSet resultSet) { + public DrillCursor(final DrillResultSetImpl resultSet) { this.resultSet = resultSet; currentBatch = resultSet.currentBatch; resultsListener = resultSet.resultslistener; } - public DrillResultSet getResultSet() { + public DrillResultSetImpl getResultSet() { return resultSet; } http://git-wip-us.apache.org/repos/asf/drill/blob/690ffa9c/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java index 4da26e9..a4a97fd 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillJdbc41Factory.java @@ -29,6 +29,8 @@ import java.util.List; import java.util.Properties; import java.util.TimeZone; +import org.apache.drill.jdbc.impl.DrillResultSetImpl; + import net.hydromatic.avatica.AvaticaConnection; import net.hydromatic.avatica.AvaticaDatabaseMetaData; import net.hydromatic.avatica.AvaticaPrepareResult; @@ -79,9 +81,9 @@ public class DrillJdbc41Factory extends DrillFactory { } @Override - public DrillResultSet newResultSet(AvaticaStatement statement, AvaticaPrepareResult prepareResult, TimeZone timeZone) { + public DrillResultSetImpl newResultSet(AvaticaStatement statement, AvaticaPrepareResult prepareResult, TimeZone timeZone) { final ResultSetMetaData metaData = newResultSetMetaData(statement, prepareResult.getColumnList()); - return new DrillResultSet(statement, (DrillPrepareResult) prepareResult, metaData, timeZone); + return new DrillResultSetImpl(statement, (DrillPrepareResult) prepareResult, metaData, timeZone); } @Override http://git-wip-us.apache.org/repos/asf/drill/blob/690ffa9c/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java index 74900bc..36ac8cf 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillResultSet.java @@ -17,229 +17,33 @@ */ package org.apache.drill.jdbc; +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Date; +import java.sql.NClob; +import java.sql.Ref; +import java.sql.ResultSet; import java.sql.ResultSetMetaData; +import java.sql.RowId; import java.sql.SQLException; -import java.util.TimeZone; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Calendar; +import java.util.Map; -import net.hydromatic.avatica.AvaticaPrepareResult; -import net.hydromatic.avatica.AvaticaResultSet; -import net.hydromatic.avatica.AvaticaStatement; +public interface DrillResultSet extends ResultSet { -import org.apache.drill.exec.client.DrillClient; -import org.apache.drill.exec.proto.UserBitShared.QueryId; -import org.apache.drill.exec.proto.UserBitShared.QueryType; -import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.record.RecordBatchLoader; -import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.user.ConnectionThrottle; -import org.apache.drill.exec.rpc.user.QueryDataBatch; -import org.apache.drill.exec.rpc.user.UserResultsListener; + String getQueryId(); -import com.google.common.collect.Queues; - -public class DrillResultSet extends AvaticaResultSet { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillResultSet.class); - - SchemaChangeListener changeListener; - final ResultsListener resultslistener = new ResultsListener(); - private volatile QueryId queryId; - private final DrillClient client; - final RecordBatchLoader currentBatch; - final DrillCursor cursor; - - public DrillResultSet(AvaticaStatement statement, AvaticaPrepareResult prepareResult, - ResultSetMetaData resultSetMetaData, TimeZone timeZone) { - super(statement, prepareResult, resultSetMetaData, timeZone); - DrillConnection c = (DrillConnection) statement.getConnection(); - DrillClient client = c.getClient(); - // DrillClient client, DrillStatement statement) { - currentBatch = new RecordBatchLoader(client.getAllocator()); - this.client = client; - cursor = new DrillCursor(this); - } - - /** - * Throws AlreadyClosedSqlException if this ResultSet is closed. - * - * @throws AlreadyClosedSqlException if ResultSe is closed - * @throws SQLException if error in calling {@link #isClosed()} - */ - private void checkNotClosed() throws SQLException { - if ( isClosed() ) { - throw new AlreadyClosedSqlException( "ResultSet is already closed." ); - } - } - - @Override - public ResultSetMetaData getMetaData() throws SQLException { - checkNotClosed(); - return super.getMetaData(); - } - - - @Override - protected void cancel() { - cleanup(); - close(); - } - - synchronized void cleanup() { - if (queryId != null && ! resultslistener.completed) { - client.cancelQuery(queryId); - } - resultslistener.close(); - currentBatch.clear(); - } - - @Override - public boolean next() throws SQLException { - checkNotClosed(); - // Next may be called after close has been called (for example after a user cancel) which in turn - // sets the cursor to null. So we must check before we call next. - // TODO: handle next() after close is called in the Avatica code. - if (super.cursor != null) { - return super.next(); - } else { - return false; - } - } - - @Override - protected DrillResultSet execute() throws SQLException{ - checkNotClosed(); - // Call driver's callback. It is permitted to throw a RuntimeException. - DrillConnectionImpl connection = (DrillConnectionImpl) statement.getConnection(); - - connection.getClient().runQuery(QueryType.SQL, this.prepareResult.getSql(), - resultslistener); - connection.getDriver().handler.onStatementExecute(statement, null); - - super.execute(); - - // don't return with metadata until we've achieved at least one return message. - try { - resultslistener.latch.await(); - cursor.next(); - } catch (InterruptedException e) { - // TODO: Check: Should this call Thread.currentThread.interrupt()? If - // not, at least document why this is empty. - } - - return this; - } - - public String getQueryId() { - if (queryId != null) { - return QueryIdHelper.getQueryId(queryId); - } else { - return null; - } - } - - class ResultsListener implements UserResultsListener { - private static final int MAX = 100; - private volatile RpcException ex; - volatile boolean completed = false; - private volatile boolean autoread = true; - private volatile ConnectionThrottle throttle; - private volatile boolean closed = false; - private CountDownLatch latch = new CountDownLatch(1); - private AtomicBoolean receivedMessage = new AtomicBoolean(false); - - - - final LinkedBlockingDeque<QueryDataBatch> queue = Queues.newLinkedBlockingDeque(); - - // TODO: Doc.: Release what if what is first relative to what? - private boolean releaseIfFirst() { - if (receivedMessage.compareAndSet(false, true)) { - latch.countDown(); - return true; - } - - return false; - } - - @Override - public void submissionFailed(RpcException ex) { - this.ex = ex; - completed = true; - close(); - System.out.println("Query failed: " + ex.getMessage()); - } - - @Override - public void queryCompleted() { - releaseIfFirst(); - completed = true; - } - - @Override - public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) { - logger.debug("Result arrived {}", result); - - // If we're in a closed state, just release the message. - if (closed) { - result.release(); - completed = true; - return; - } - - // We're active; let's add to the queue. - queue.add(result); - if (queue.size() >= MAX - 1) { - throttle.setAutoRead(false); - this.throttle = throttle; - autoread = false; - } - - releaseIfFirst(); - } - - // TODO: Doc.: Specify whether result can be null and what that means. - public QueryDataBatch getNext() throws RpcException, InterruptedException { - while (true) { - if (ex != null) { - throw ex; - } - if (completed && queue.isEmpty()) { - return null; - } else { - QueryDataBatch q = queue.poll(50, TimeUnit.MILLISECONDS); - if (q != null) { - if (!autoread && queue.size() < MAX / 2) { - autoread = true; - throttle.setAutoRead(true); - throttle = null; - } - return q; - } - } - } - } - - void close() { - closed = true; - while (!queue.isEmpty()) { - QueryDataBatch qrb = queue.poll(); - if (qrb != null && qrb.getData() != null) { - qrb.getData().release(); - } - } - // close may be called before the first result is received and the main thread is blocked waiting - // for the result. In that case we want to unblock the main thread. - latch.countDown(); - completed = true; - } - - @Override - public void queryIdArrived(QueryId queryId) { - DrillResultSet.this.queryId = queryId; - } - } } + + \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/690ffa9c/exec/jdbc/src/main/java/org/apache/drill/jdbc/MetaImpl.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/MetaImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/MetaImpl.java index 4ff626e..78ca221 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/MetaImpl.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/MetaImpl.java @@ -20,7 +20,6 @@ package org.apache.drill.jdbc; import java.sql.DatabaseMetaData; import java.sql.ResultSet; import java.sql.Types; - import java.util.List; import net.hydromatic.avatica.AvaticaPrepareResult; @@ -31,6 +30,7 @@ import net.hydromatic.avatica.Meta; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.util.DrillStringUtils; +import org.apache.drill.jdbc.impl.DrillResultSetImpl; public class MetaImpl implements Meta { @@ -619,7 +619,7 @@ public class MetaImpl implements Meta { } public Cursor createCursor(AvaticaResultSet resultSet_) { - return ((DrillResultSet) resultSet_).cursor; + return ((DrillResultSetImpl) resultSet_).cursor; } public AvaticaPrepareResult prepare(AvaticaStatement statement_, String sql) { http://git-wip-us.apache.org/repos/asf/drill/blob/690ffa9c/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java new file mode 100644 index 0000000..24ef62b --- /dev/null +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillResultSetImpl.java @@ -0,0 +1,263 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.jdbc.impl; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.TimeZone; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import net.hydromatic.avatica.AvaticaPrepareResult; +import net.hydromatic.avatica.AvaticaResultSet; +import net.hydromatic.avatica.AvaticaStatement; + +import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryType; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.user.ConnectionThrottle; +import org.apache.drill.exec.rpc.user.QueryDataBatch; +import org.apache.drill.exec.rpc.user.UserResultsListener; +import org.apache.drill.jdbc.AlreadyClosedSqlException; +import org.apache.drill.jdbc.DrillConnection; +import org.apache.drill.jdbc.DrillConnectionImpl; +import org.apache.drill.jdbc.DrillCursor; +import org.apache.drill.jdbc.DrillResultSet; +import org.apache.drill.jdbc.SchemaChangeListener; + +import com.google.common.collect.Queues; + + +//???? Split this into interface org.apache.drill.jdbc.DrillResultSet for published +// interface and a class probably named org.apache.drill.jdbc.impl.DrillResultSetImpl. +// Add any needed documentation of Drill-specific behavior of JDBC-defined +// ResultSet methods to new DrillResultSet. ... + +public class DrillResultSetImpl extends AvaticaResultSet implements DrillResultSet { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillResultSetImpl.class); + + // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) + public SchemaChangeListener changeListener; + // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) + public final ResultsListener resultslistener = new ResultsListener(); + private volatile QueryId queryId; + private final DrillClient client; + // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) + public final RecordBatchLoader currentBatch; + // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) + public final DrillCursor cursor; + + public DrillResultSetImpl(AvaticaStatement statement, AvaticaPrepareResult prepareResult, + ResultSetMetaData resultSetMetaData, TimeZone timeZone) { + super(statement, prepareResult, resultSetMetaData, timeZone); + DrillConnection c = (DrillConnection) statement.getConnection(); + DrillClient client = c.getClient(); + // DrillClient client, DrillStatement statement) { + currentBatch = new RecordBatchLoader(client.getAllocator()); + this.client = client; + cursor = new DrillCursor(this); + } + + /** + * Throws AlreadyClosedSqlException if this ResultSet is closed. + * + * @throws AlreadyClosedSqlException if ResultSet is closed + * @throws SQLException if error in calling {@link #isClosed()} + */ + private void checkNotClosed() throws SQLException { + if ( isClosed() ) { + throw new AlreadyClosedSqlException( "ResultSet is already closed." ); + } + } + + @Override + public ResultSetMetaData getMetaData() throws SQLException { + checkNotClosed(); + return super.getMetaData(); + } + + + @Override + protected void cancel() { + cleanup(); + close(); + } + + // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) + public synchronized void cleanup() { + if (queryId != null && ! resultslistener.completed) { + client.cancelQuery(queryId); + } + resultslistener.close(); + currentBatch.clear(); + } + + @Override + public boolean next() throws SQLException { + checkNotClosed(); + // Next may be called after close has been called (for example after a user cancel) which in turn + // sets the cursor to null. So we must check before we call next. + // TODO: handle next() after close is called in the Avatica code. + if (super.cursor != null) { + return super.next(); + } else { + return false; + } + } + + @Override + protected DrillResultSetImpl execute() throws SQLException{ + checkNotClosed(); + // Call driver's callback. It is permitted to throw a RuntimeException. + DrillConnectionImpl connection = (DrillConnectionImpl) statement.getConnection(); + + connection.getClient().runQuery(QueryType.SQL, this.prepareResult.getSql(), + resultslistener); + connection.getDriver().handler.onStatementExecute(statement, null); + + super.execute(); + + // don't return with metadata until we've achieved at least one return message. + try { + resultslistener.latch.await(); + cursor.next(); + } catch (InterruptedException e) { + // TODO: Check: Should this call Thread.currentThread.interrupt()? If + // not, at least document why this is empty. + } + + return this; + } + + public String getQueryId() { + if (queryId != null) { + return QueryIdHelper.getQueryId(queryId); + } else { + return null; + } + } + + // (Public until JDBC impl. classes moved out of published-intf. package. (DRILL-2089).) + public class ResultsListener implements UserResultsListener { + private static final int MAX = 100; + private volatile RpcException ex; + volatile boolean completed = false; + private volatile boolean autoread = true; + private volatile ConnectionThrottle throttle; + private volatile boolean closed = false; + private CountDownLatch latch = new CountDownLatch(1); + private AtomicBoolean receivedMessage = new AtomicBoolean(false); + + + + final LinkedBlockingDeque<QueryDataBatch> queue = Queues.newLinkedBlockingDeque(); + + // TODO: Doc.: Release what if what is first relative to what? + private boolean releaseIfFirst() { + if (receivedMessage.compareAndSet(false, true)) { + latch.countDown(); + return true; + } + + return false; + } + + @Override + public void submissionFailed(RpcException ex) { + this.ex = ex; + completed = true; + close(); + System.out.println("Query failed: " + ex.getMessage()); + } + + @Override + public void queryCompleted() { + releaseIfFirst(); + completed = true; + } + + @Override + public void dataArrived(QueryDataBatch result, ConnectionThrottle throttle) { + logger.debug("Result arrived {}", result); + + // If we're in a closed state, just release the message. + if (closed) { + result.release(); + completed = true; + return; + } + + // We're active; let's add to the queue. + queue.add(result); + if (queue.size() >= MAX - 1) { + throttle.setAutoRead(false); + this.throttle = throttle; + autoread = false; + } + + releaseIfFirst(); + } + + // TODO: Doc.: Specify whether result can be null and what that means. + public QueryDataBatch getNext() throws RpcException, InterruptedException { + while (true) { + if (ex != null) { + throw ex; + } + if (completed && queue.isEmpty()) { + return null; + } else { + QueryDataBatch q = queue.poll(50, TimeUnit.MILLISECONDS); + if (q != null) { + if (!autoread && queue.size() < MAX / 2) { + autoread = true; + throttle.setAutoRead(true); + throttle = null; + } + return q; + } + } + } + } + + void close() { + closed = true; + while (!queue.isEmpty()) { + QueryDataBatch qrb = queue.poll(); + if (qrb != null && qrb.getData() != null) { + qrb.getData().release(); + } + } + // close may be called before the first result is received and the main thread is blocked waiting + // for the result. In that case we want to unblock the main thread. + latch.countDown(); + completed = true; + } + + @Override + public void queryIdArrived(QueryId queryId) { + DrillResultSetImpl.this.queryId = queryId; + } + } + +} http://git-wip-us.apache.org/repos/asf/drill/blob/690ffa9c/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcTestQueryBase.java ---------------------------------------------------------------------- diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcTestQueryBase.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcTestQueryBase.java index 4fec567..5c0a0e5 100644 --- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcTestQueryBase.java +++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcTestQueryBase.java @@ -19,7 +19,6 @@ package org.apache.drill.jdbc.test; import java.nio.file.Paths; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.Statement; @@ -53,7 +52,7 @@ public class JdbcTestQueryBase extends JdbcTest { Stopwatch watch = new Stopwatch().start(); Statement s = conn.createStatement(); ResultSet r = s.executeQuery(sql); - System.out.println(String.format("QueryId: %s", ((DrillResultSet) r).getQueryId())); + System.out.println(String.format("QueryId: %s", r.unwrap(DrillResultSet.class).getQueryId())); boolean first = true; while (r.next()) { ResultSetMetaData md = r.getMetaData();