http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaJdbc41Factory.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaJdbc41Factory.java 
b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaJdbc41Factory.java
index b573eab..f410dc8 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaJdbc41Factory.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaJdbc41Factory.java
@@ -84,10 +84,10 @@ class AvaticaJdbc41Factory implements AvaticaFactory {
   }
 
   public AvaticaResultSet newResultSet(AvaticaStatement statement,
-      Meta.Signature signature, TimeZone timeZone, Meta.Frame firstFrame) {
+      QueryState state, Meta.Signature signature, TimeZone timeZone, 
Meta.Frame firstFrame) {
     final ResultSetMetaData metaData =
         newResultSetMetaData(statement, signature);
-    return new AvaticaResultSet(statement, signature, metaData, timeZone,
+    return new AvaticaResultSet(statement, state, signature, metaData, 
timeZone,
         firstFrame);
   }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java
 
b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java
index 9144ed3..4f1c726 100644
--- 
a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java
+++ 
b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaPreparedStatement.java
@@ -16,6 +16,7 @@
  */
 package org.apache.calcite.avatica;
 
+import org.apache.calcite.avatica.Meta.Signature;
 import org.apache.calcite.avatica.remote.TypedValue;
 
 import java.io.InputStream;
@@ -106,7 +107,9 @@ public abstract class AvaticaPreparedStatement
 
   public ResultSet executeQuery() throws SQLException {
     this.updateCount = -1;
-    return getConnection().executeQueryInternal(this, getSignature(), null);
+    final Signature sig = getSignature();
+    return getConnection().executeQueryInternal(this, sig, null,
+        new QueryState(sig.sql));
   }
 
   public ParameterMetaData getParameterMetaData() throws SQLException {
@@ -118,7 +121,8 @@ public abstract class AvaticaPreparedStatement
   }
 
   public long executeLargeUpdate() throws SQLException {
-    getConnection().executeQueryInternal(this, getSignature(), null);
+    getConnection().executeQueryInternal(this, getSignature(), null,
+        new QueryState(getSignature().sql));
     return updateCount;
   }
 
@@ -199,7 +203,8 @@ public abstract class AvaticaPreparedStatement
 
   public boolean execute() throws SQLException {
     this.updateCount = -1;
-    getConnection().executeQueryInternal(this, getSignature(), null);
+    getConnection().executeQueryInternal(this, getSignature(), null,
+        new QueryState(getSignature().sql));
     // Result set is null for DML or DDL.
     // Result set is closed if user cancelled the query.
     return openResultSet != null && !openResultSet.isClosed();

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaResultSet.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaResultSet.java 
b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaResultSet.java
index f7b7ccd..478723f 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaResultSet.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaResultSet.java
@@ -49,6 +49,7 @@ import java.util.TimeZone;
  */
 public class AvaticaResultSet implements ResultSet, ArrayImpl.Factory {
   protected final AvaticaStatement statement;
+  protected final QueryState state;
   protected final Meta.Signature signature;
   protected final Meta.Frame firstFrame;
   protected final List<ColumnMetaData> columnMetaDataList;
@@ -69,11 +70,13 @@ public class AvaticaResultSet implements ResultSet, 
ArrayImpl.Factory {
   private Cursor timeoutCursor;
 
   public AvaticaResultSet(AvaticaStatement statement,
+      QueryState state,
       Meta.Signature signature,
       ResultSetMetaData resultSetMetaData,
       TimeZone timeZone,
       Meta.Frame firstFrame) {
     this.statement = statement;
+    this.state = state;
     this.signature = signature;
     this.firstFrame = firstFrame;
     this.columnMetaDataList = signature.columns;
@@ -182,7 +185,7 @@ public class AvaticaResultSet implements ResultSet, 
ArrayImpl.Factory {
   protected AvaticaResultSet execute() throws SQLException {
     final List<TypedValue> parameterValues = 
statement.getBoundParameterValues();
     final Iterable<Object> iterable1 =
-        statement.connection.meta.createIterable(statement.handle, signature,
+        statement.connection.meta.createIterable(statement.handle, state, 
signature,
             parameterValues, firstFrame);
     this.cursor = MetaImpl.createCursor(signature.cursorFactory, iterable1);
     this.accessorList =

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java 
b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
index 6bcf8eb..ca73f43 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/AvaticaStatement.java
@@ -104,7 +104,7 @@ public abstract class AvaticaStatement
     this.signature = signature;
     this.closed = false;
     if (h == null) {
-      final Meta.ConnectionHandle ch = new 
Meta.ConnectionHandle(connection.id);
+      final Meta.ConnectionHandle ch = connection.handle;
       h = connection.meta.createStatement(ch);
     }
     connection.statementMap.put(h.id, this);
@@ -130,12 +130,44 @@ public abstract class AvaticaStatement
     try {
       // In JDBC, maxRowCount = 0 means no limit; in prepare it means LIMIT 0
       final long maxRowCount1 = maxRowCount <= 0 ? -1 : maxRowCount;
-      Meta.ExecuteResult x =
-          connection.prepareAndExecuteInternal(this, sql, maxRowCount1);
+      for (int i = 0; i < connection.maxRetriesPerExecute; i++) {
+        try {
+          Meta.ExecuteResult x =
+              connection.prepareAndExecuteInternal(this, sql, maxRowCount1);
+          return;
+        } catch (NoSuchStatementException e) {
+          resetStatement();
+        }
+      }
     } catch (RuntimeException e) {
       throw connection.helper.createException("Error while executing SQL \"" + 
sql + "\": "
           + e.getMessage(), e);
     }
+
+    throw new RuntimeException("Failed to successfully execute query after "
+        + connection.maxRetriesPerExecute + " attempts.");
+  }
+
+  protected void resetStatement() {
+    // Invalidate the old statement
+    connection.statementMap.remove(handle.id);
+    // Get a new one
+    final Meta.ConnectionHandle ch = new Meta.ConnectionHandle(connection.id);
+    Meta.StatementHandle h = connection.meta.createStatement(ch);
+    // Cache it in the connection
+    connection.statementMap.put(h.id, this);
+    // Update the local state and try again
+    this.handle = h;
+  }
+
+  /**
+   * Re-initialize the ResultSet on the server with the given state.
+   * @param state The ResultSet's state.
+   * @param offset Offset into the desired ResultSet
+   * @return True if the ResultSet has more results, false if there are no 
more results.
+   */
+  protected boolean syncResults(QueryState state, long offset) throws 
NoSuchStatementException {
+    return connection.meta.syncResults(handle, state, offset);
   }
 
   // implement Statement
@@ -447,7 +479,7 @@ public abstract class AvaticaStatement
    */
   protected ResultSet executeQueryInternal(Meta.Signature signature)
       throws SQLException {
-    return connection.executeQueryInternal(this, signature, null);
+    return connection.executeQueryInternal(this, signature, null, null);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java 
b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
index 6c51b1c..5ddacdb 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/Meta.java
@@ -34,6 +34,7 @@ import java.lang.reflect.Method;
 import java.math.BigDecimal;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -204,7 +205,7 @@ public interface Meta {
    * <p>The default implementation just returns {@code iterable}, which it
    * requires to be not null; derived classes may instead choose to execute the
    * relational expression in {@code signature}. */
-  Iterable<Object> createIterable(StatementHandle handle, Signature signature,
+  Iterable<Object> createIterable(StatementHandle stmt, QueryState state, 
Signature signature,
       List<TypedValue> parameterValues, Frame firstFrame);
 
   /** Prepares a statement.
@@ -227,7 +228,7 @@ public interface Meta {
    *     first frame of data
    */
   ExecuteResult prepareAndExecute(StatementHandle h, String sql,
-      long maxRowCount, PrepareCallback callback);
+      long maxRowCount, PrepareCallback callback) throws 
NoSuchStatementException;
 
   /** Returns a frame of rows.
    *
@@ -243,7 +244,8 @@ public interface Meta {
    * no limit
    * @return Frame, or null if there are no more
    */
-  Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount);
+  Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount) throws
+      NoSuchStatementException, MissingResultsException;
 
   /** Executes a prepared statement.
    *
@@ -254,7 +256,7 @@ public interface Meta {
    * @return Frame, or null if there are no more
    */
   ExecuteResult execute(StatementHandle h, List<TypedValue> parameterValues,
-      long maxRowCount);
+      long maxRowCount) throws NoSuchStatementException;
 
   /** Called during the creation of a statement to allocate a new handle.
    *
@@ -280,6 +282,13 @@ public interface Meta {
   /** Closes a connection */
   void closeConnection(ConnectionHandle ch);
 
+  /**
+   * Re-set the {@link ResultSet} on a Statement. Not a JDBC method.
+   * @return True if there are results to fetch after resetting to the given 
offset. False otherwise
+   */
+  boolean syncResults(StatementHandle sh, QueryState state, long offset)
+      throws NoSuchStatementException;
+
   /** Sync client and server view of connection properties.
    *
    * <p>Note: this interface is considered "experimental" and may undergo 
further changes as this

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java 
b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
index b34b325..f4764a1 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/MetaImpl.java
@@ -751,15 +751,23 @@ public abstract class MetaImpl implements Meta {
     return createEmptyResultSet(MetaPseudoColumn.class);
   }
 
-  public Iterable<Object> createIterable(StatementHandle handle,
+  @Override public Iterable<Object> createIterable(StatementHandle handle, 
QueryState state,
       Signature signature, List<TypedValue> parameterValues, Frame firstFrame) 
{
     if (firstFrame != null && firstFrame.done) {
       return firstFrame.rows;
     }
-    return new FetchIterable(handle, firstFrame, parameterValues);
+    AvaticaStatement stmt;
+    try {
+      stmt = connection.lookupStatement(handle);
+    } catch (SQLException e) {
+      throw new RuntimeException(e);
+    }
+    return new FetchIterable(stmt, state,
+        firstFrame, parameterValues);
   }
 
-  public Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount) {
+  public Frame fetch(AvaticaStatement stmt, List<TypedValue> parameterValues,
+      long offset, int fetchMaxRowCount) throws NoSuchStatementException, 
MissingResultsException {
     return null;
   }
 
@@ -824,33 +832,40 @@ public abstract class MetaImpl implements Meta {
   /** Iterable that yields an iterator over rows coming from a sequence of
    * {@link Frame}s. */
   private class FetchIterable implements Iterable<Object> {
-    private final StatementHandle handle;
+    private final AvaticaStatement stmt;
+    private final QueryState state;
     private final Frame firstFrame;
     private final List<TypedValue> parameterValues;
 
-    public FetchIterable(StatementHandle handle, Frame firstFrame,
+    public FetchIterable(AvaticaStatement stmt, QueryState state, Frame 
firstFrame,
         List<TypedValue> parameterValues) {
-      this.handle = handle;
+      this.stmt = stmt;
+      this.state = state;
       this.firstFrame = firstFrame;
       this.parameterValues = parameterValues;
     }
 
     public Iterator<Object> iterator() {
-      return new FetchIterator(handle, firstFrame, parameterValues);
+      return new FetchIterator(stmt, state, firstFrame, parameterValues);
     }
   }
 
   /** Iterator over rows coming from a sequence of {@link Frame}s. */
   private class FetchIterator implements Iterator<Object> {
-    private final StatementHandle handle;
+    private final AvaticaStatement stmt;
+    private final QueryState state;
     private Frame frame;
     private Iterator<Object> rows;
     private List<TypedValue> parameterValues;
+    private List<TypedValue> originalParameterValues;
+    private long currentOffset = 0;
 
-    public FetchIterator(StatementHandle handle, Frame firstFrame,
+    public FetchIterator(AvaticaStatement stmt, QueryState state, Frame 
firstFrame,
         List<TypedValue> parameterValues) {
-      this.handle = handle;
+      this.stmt = stmt;
+      this.state = state;
       this.parameterValues = parameterValues;
+      this.originalParameterValues = parameterValues;
       if (firstFrame == null) {
         frame = Frame.MORE;
         rows = EmptyIterator.INSTANCE;
@@ -874,6 +889,7 @@ public abstract class MetaImpl implements Meta {
         throw new NoSuchElementException();
       }
       final Object o = rows.next();
+      currentOffset++;
       moveNext();
       return o;
     }
@@ -887,7 +903,31 @@ public abstract class MetaImpl implements Meta {
           rows = null;
           break;
         }
-        frame = fetch(handle, frame.offset, 
AvaticaStatement.DEFAULT_FETCH_SIZE);
+        try {
+          // currentOffset updated after element is read from `rows` iterator
+          frame = fetch(stmt.handle, currentOffset, 
AvaticaStatement.DEFAULT_FETCH_SIZE);
+        } catch (NoSuchStatementException e) {
+          resetStatement();
+          // re-fetch the batch where we left off
+          continue;
+        } catch (MissingResultsException e) {
+          try {
+            // We saw the statement, but it didnt' have a resultset 
initialized. So, reset it.
+            if (!stmt.syncResults(state, currentOffset)) {
+              // This returned false, so there aren't actually any more 
results to iterate over
+              frame = null;
+              rows = null;
+              break;
+            }
+            // syncResults returning true means we need to fetch those results
+          } catch (NoSuchStatementException e1) {
+            // Tried to reset the result set, but lost the statement, save a 
loop before retrying.
+            resetStatement();
+            // Will just loop back around to a MissingResultsException, but 
w/e recursion
+          }
+          // Kick back to the top to try to fetch again (in both branches)
+          continue;
+        }
         parameterValues = null; // don't execute next time
         if (frame == null) {
           rows = null;
@@ -898,6 +938,13 @@ public abstract class MetaImpl implements Meta {
         rows = frame.rows.iterator();
       }
     }
+
+    private void resetStatement() {
+      // If we have to reset the statement, we need to reset the 
parameterValues too
+      parameterValues = originalParameterValues;
+      // Defer to the statement to reset itself
+      stmt.resetStatement();
+    }
   }
 
   /** Returns whether a list of parameter values has any null elements. */

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/MissingResultsException.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/MissingResultsException.java 
b/avatica/src/main/java/org/apache/calcite/avatica/MissingResultsException.java
new file mode 100644
index 0000000..7746769
--- /dev/null
+++ 
b/avatica/src/main/java/org/apache/calcite/avatica/MissingResultsException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica;
+
+import org.apache.calcite.avatica.Meta.StatementHandle;
+
+import java.sql.ResultSet;
+
+/**
+ * An Exception which denotes that a cached Statement is present but has no 
{@link ResultSet}.
+ */
+public class MissingResultsException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  private final StatementHandle handle;
+
+  public MissingResultsException(StatementHandle handle) {
+    this.handle = handle;
+  }
+
+  public StatementHandle getHandle() {
+    return handle;
+  }
+}
+
+// End MissingResultsException.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/NoSuchConnectionException.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/NoSuchConnectionException.java
 
b/avatica/src/main/java/org/apache/calcite/avatica/NoSuchConnectionException.java
new file mode 100644
index 0000000..b5a940d
--- /dev/null
+++ 
b/avatica/src/main/java/org/apache/calcite/avatica/NoSuchConnectionException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica;
+
+/**
+ * An Exception that denotes that the given Connection is not cached.
+ */
+public class NoSuchConnectionException extends RuntimeException {
+
+  private static final long serialVersionUID = 1L;
+
+  private final String connectionId;
+
+  public NoSuchConnectionException(String connectionId) {
+    this.connectionId = connectionId;
+  }
+
+  public String getConnectionId() {
+    return connectionId;
+  }
+}
+
+// End NoSuchConnectionException.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/NoSuchStatementException.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/NoSuchStatementException.java
 
b/avatica/src/main/java/org/apache/calcite/avatica/NoSuchStatementException.java
new file mode 100644
index 0000000..321011b
--- /dev/null
+++ 
b/avatica/src/main/java/org/apache/calcite/avatica/NoSuchStatementException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica;
+
+import org.apache.calcite.avatica.Meta.StatementHandle;
+
+/**
+ * An Exception that denotes that the given Statement is not cached.
+ */
+public class NoSuchStatementException extends Exception {
+
+  private static final long serialVersionUID = 1L;
+
+  private final StatementHandle stmtHandle;
+
+  public NoSuchStatementException(StatementHandle stmtHandle) {
+    this.stmtHandle = stmtHandle;
+  }
+
+  public StatementHandle getStatementHandle() {
+    return stmtHandle;
+  }
+}
+
+// End NoSuchStatementException.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/QueryState.java
----------------------------------------------------------------------
diff --git a/avatica/src/main/java/org/apache/calcite/avatica/QueryState.java 
b/avatica/src/main/java/org/apache/calcite/avatica/QueryState.java
new file mode 100644
index 0000000..c702f3c
--- /dev/null
+++ b/avatica/src/main/java/org/apache/calcite/avatica/QueryState.java
@@ -0,0 +1,489 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica;
+
+import org.apache.calcite.avatica.proto.Common;
+import org.apache.calcite.avatica.proto.Common.MetaDataOperationArgument;
+import 
org.apache.calcite.avatica.proto.Common.MetaDataOperationArgument.ArgumentType;
+import org.apache.calcite.avatica.remote.MetaDataOperation;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * A struct used to encapsulate the necessary information to reconstitute a 
ResultSet in the
+ * Avatica server.
+ */
+public class QueryState {
+
+  /**
+   * An enumeration that represents how a ResultSet was created.
+   */
+  public static enum StateType {
+    SQL,
+    METADATA;
+
+    public Common.StateType toProto() {
+      switch (this) {
+      case SQL:
+        return Common.StateType.SQL;
+      case METADATA:
+        return Common.StateType.METADATA;
+      default:
+        return Common.StateType.UNRECOGNIZED;
+      }
+    }
+
+    public static StateType fromProto(Common.StateType protoType) {
+      switch (protoType) {
+      case SQL:
+        return StateType.SQL;
+      case METADATA:
+        return StateType.METADATA;
+      default:
+        throw new IllegalArgumentException("Unhandled StateType " + protoType);
+      }
+    }
+  }
+
+  @JsonProperty("type")
+  public final StateType type;
+
+  @JsonProperty("sql")
+  public final String sql;
+
+  @JsonProperty("metaDataOperation")
+  public final MetaDataOperation metaDataOperation;
+  @JsonProperty("operationArgs")
+  public final Object[] operationArgs;
+
+  /**
+   * Constructor encapsulating a SQL query used to create a result set.
+   *
+   * @param sql The SQL query.
+   */
+  public QueryState(String sql) {
+    // This doesn't to be non-null
+    this.sql = sql;
+    this.type = StateType.SQL;
+
+    // Null out the members we don't use.
+    this.metaDataOperation = null;
+    this.operationArgs = null;
+  }
+
+  /**
+   * Constructor encapsulating a metadata operation's result set.
+   *
+   * @param op A pointer to the {@link DatabaseMetaData} operation being 
invoked.
+   * @param args The arguments to the method being invoked.
+   */
+  public QueryState(MetaDataOperation op, Object... args) {
+    this.metaDataOperation = Objects.requireNonNull(op);
+    this.operationArgs = Arrays.copyOf(Objects.requireNonNull(args), 
args.length);
+    this.type = StateType.METADATA;
+
+    // Null out the members we won't use
+    this.sql = null;
+  }
+
+  /**
+   * Not intended for external use. For Jackson-databind only.
+   */
+  public QueryState(StateType type, String sql, MetaDataOperation op, 
Object... args) {
+    this.type = Objects.requireNonNull(type);
+    switch (type) {
+    case SQL:
+      this.sql = Objects.requireNonNull(sql);
+      if (null != op) {
+        throw new IllegalArgumentException("Expected null MetaDataOperation, 
but got " + op);
+      }
+      this.metaDataOperation = null;
+      if (null != args) {
+        throw new IllegalArgumentException("Expected null arguments, but got "
+            + Arrays.toString(args));
+      }
+      this.operationArgs = null;
+      break;
+    case METADATA:
+      this.metaDataOperation = Objects.requireNonNull(op);
+      this.operationArgs = Objects.requireNonNull(args);
+      if (null != sql) {
+        throw new IllegalArgumentException("Expected null SQl but got " + sql);
+      }
+      this.sql = null;
+      break;
+    default:
+      throw new IllegalArgumentException("Unable to handle StateType " + type);
+    }
+  }
+
+  /**
+   * Not intended for external use. For Jackson-databind only.
+   */
+  public QueryState() {
+    this.sql = null;
+    this.metaDataOperation = null;
+    this.type = null;
+    this.operationArgs = null;
+  }
+
+  /**
+   * @return The {@link StateType} for this encapsulated state.
+   */
+  public StateType getType() {
+    return type;
+  }
+
+  /**
+   * @return The SQL expression to invoke.
+   */
+  public String getSql() {
+    assert type == StateType.SQL;
+    return sql;
+  }
+
+  /**
+   * @return The metadata operation to invoke.
+   */
+  public MetaDataOperation getMetaDataOperation() {
+    assert type == StateType.METADATA;
+    return metaDataOperation;
+  }
+
+  /**
+   * @return The Arguments for the given metadata operation.
+   */
+  public Object[] getOperationArgs() {
+    assert type == StateType.METADATA;
+    return operationArgs;
+  }
+
+  public ResultSet invoke(Connection conn, Statement statement) throws 
SQLException {
+    switch (type) {
+    case SQL:
+      boolean ret = Objects.requireNonNull(statement).execute(sql);
+      ResultSet results = statement.getResultSet();
+
+      // Either execute(sql) returned true or the resultSet was null
+      assert ret || null == results;
+
+      return results;
+    case METADATA:
+      DatabaseMetaData metadata = Objects.requireNonNull(conn).getMetaData();
+      switch (metaDataOperation) {
+      case GET_ATTRIBUTES:
+        verifyOpArgs(4);
+        return metadata.getAttributes((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2],
+            (String) operationArgs[3]);
+      case GET_BEST_ROW_IDENTIFIER:
+        verifyOpArgs(5);
+        return metadata.getBestRowIdentifier((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2],
+            (int) operationArgs[3],
+            (boolean) operationArgs[4]);
+      case GET_CATALOGS:
+        verifyOpArgs(0);
+        return metadata.getCatalogs();
+      case GET_COLUMNS:
+        verifyOpArgs(4);
+        return metadata.getColumns((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2],
+            (String) operationArgs[3]);
+      case GET_COLUMN_PRIVILEGES:
+        verifyOpArgs(4);
+        return metadata.getColumnPrivileges((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2],
+            (String) operationArgs[3]);
+      case GET_CROSS_REFERENCE:
+        verifyOpArgs(6);
+        return metadata.getCrossReference((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2],
+            (String) operationArgs[3],
+            (String) operationArgs[4],
+            (String) operationArgs[5]);
+      case GET_EXPORTED_KEYS:
+        verifyOpArgs(3);
+        return metadata.getExportedKeys((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2]);
+      case GET_FUNCTIONS:
+        verifyOpArgs(3);
+        return metadata.getFunctions((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2]);
+      case GET_FUNCTION_COLUMNS:
+        verifyOpArgs(4);
+        return metadata.getFunctionColumns((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2],
+            (String) operationArgs[3]);
+      case GET_IMPORTED_KEYS:
+        verifyOpArgs(3);
+        return metadata.getImportedKeys((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2]);
+      case GET_INDEX_INFO:
+        verifyOpArgs(5);
+        return metadata.getIndexInfo((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2],
+            (boolean) operationArgs[3],
+            (boolean) operationArgs[4]);
+      case GET_PRIMARY_KEYS:
+        verifyOpArgs(3);
+        return metadata.getPrimaryKeys((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2]);
+      case GET_PROCEDURES:
+        verifyOpArgs(3);
+        return metadata.getProcedures((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2]);
+      case GET_PROCEDURE_COLUMNS:
+        verifyOpArgs(4);
+        return metadata.getProcedureColumns((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2],
+            (String) operationArgs[3]);
+      case GET_PSEUDO_COLUMNS:
+        verifyOpArgs(4);
+        return metadata.getPseudoColumns((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2],
+            (String) operationArgs[3]);
+      case GET_SCHEMAS:
+        verifyOpArgs(0);
+        return metadata.getSchemas();
+      case GET_SCHEMAS_WITH_ARGS:
+        verifyOpArgs(2);
+        return metadata.getSchemas((String) operationArgs[0],
+            (String) operationArgs[1]);
+      case GET_SUPER_TABLES:
+        verifyOpArgs(3);
+        return metadata.getSuperTables((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2]);
+      case GET_SUPER_TYPES:
+        verifyOpArgs(3);
+        return metadata.getSuperTypes((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2]);
+      case GET_TABLES:
+        verifyOpArgs(4);
+        return metadata.getTables((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2],
+            (String[]) operationArgs[3]);
+      case GET_TABLE_PRIVILEGES:
+        verifyOpArgs(3);
+        return metadata.getTablePrivileges((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2]);
+      case GET_TABLE_TYPES:
+        verifyOpArgs(0);
+        return metadata.getTableTypes();
+      case GET_TYPE_INFO:
+        verifyOpArgs(0);
+        return metadata.getTypeInfo();
+      case GET_UDTS:
+        verifyOpArgs(4);
+        return metadata.getUDTs((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2],
+            (int[]) operationArgs[3]);
+      case GET_VERSION_COLUMNS:
+        verifyOpArgs(3);
+        return metadata.getVersionColumns((String) operationArgs[0],
+            (String) operationArgs[1],
+            (String) operationArgs[2]);
+      default:
+        throw new IllegalArgumentException("Unhandled Metadata operation: " + 
metaDataOperation);
+      }
+    default:
+      throw new IllegalArgumentException("Unable to process QueryState of type 
" + type);
+    }
+  }
+
+  private void verifyOpArgs(int expectedArgs) {
+    if (expectedArgs != operationArgs.length) {
+      throw new RuntimeException("Expected " + expectedArgs + " arguments, but 
got "
+          + Arrays.toString(operationArgs));
+    }
+  }
+
+  public Common.QueryState toProto() {
+    Common.QueryState.Builder builder = Common.QueryState.newBuilder();
+
+    // Required
+    switch (type) {
+    case SQL:
+      builder.setType(Common.StateType.SQL);
+      break;
+    case METADATA:
+      builder.setType(Common.StateType.METADATA);
+      break;
+    default:
+      throw new IllegalStateException("Unhandled type: " + type);
+    }
+
+    // Optional SQL
+    if (null != sql) {
+      builder.setSql(sql).setHasSql(true);
+    }
+
+    // Optional metaDataOperation
+    if (null != metaDataOperation) {
+      builder.setOp(metaDataOperation.toProto()).setHasOp(true);
+    }
+
+    // Optional operationArgs
+    if (null != operationArgs) {
+      builder.setHasArgs(true);
+      for (Object arg : operationArgs) {
+        MetaDataOperationArgument.Builder argBuilder = 
MetaDataOperationArgument.newBuilder();
+
+        if (null == arg) {
+          builder.addArgs(argBuilder.setType(ArgumentType.NULL).build());
+        } else if (arg instanceof String) {
+          builder.addArgs(argBuilder.setType(ArgumentType.STRING)
+              .setStringValue((String) arg).build());
+        } else if (arg instanceof Integer) {
+          
builder.addArgs(argBuilder.setType(ArgumentType.INT).setIntValue((int) 
arg).build());
+        } else if (arg instanceof Boolean) {
+          builder.addArgs(
+              argBuilder.setType(ArgumentType.BOOL).setBoolValue((boolean) 
arg).build());
+        } else if (arg instanceof String[]) {
+          argBuilder.setType(ArgumentType.REPEATED_STRING);
+          for (String strArg : (String[]) arg) {
+            argBuilder.addStringArrayValues(strArg);
+          }
+          builder.addArgs(argBuilder.build());
+        } else if (arg instanceof int[]) {
+          argBuilder.setType(ArgumentType.REPEATED_INT);
+          for (int intArg : (int[]) arg) {
+            argBuilder.addIntArrayValues(intArg);
+          }
+          builder.addArgs(argBuilder.build());
+        } else {
+          throw new RuntimeException("Unexpected operation argument: " + 
arg.getClass());
+        }
+      }
+    } else {
+      builder.setHasArgs(false);
+    }
+
+    return builder.build();
+  }
+
+  public static QueryState fromProto(Common.QueryState protoState) {
+    StateType type = StateType.fromProto(protoState.getType());
+    String sql = protoState.getHasSql() ? protoState.getSql() : null;
+    MetaDataOperation op = protoState.getHasOp()
+        ? MetaDataOperation.fromProto(protoState.getOp()) : null;
+    Object[] opArgs = null;
+    if (protoState.getHasArgs()) {
+      opArgs = new Object[protoState.getArgsCount()];
+      int i = 0;
+      for (Common.MetaDataOperationArgument arg : protoState.getArgsList()) {
+        switch (arg.getType()) {
+        case STRING:
+          opArgs[i] = arg.getStringValue();
+          break;
+        case BOOL:
+          opArgs[i] = arg.getBoolValue();
+          break;
+        case INT:
+          opArgs[i] = arg.getIntValue();
+          break;
+        case REPEATED_STRING:
+          opArgs[i] = arg.getStringArrayValuesList().toArray(
+              new String[arg.getStringArrayValuesCount()]);
+          break;
+        case REPEATED_INT:
+          int[] arr = new int[arg.getIntArrayValuesCount()];
+          int offset = 0;
+          for (Integer val : arg.getIntArrayValuesList()) {
+            arr[offset] = val;
+            offset++;
+          }
+          opArgs[i] = arr;
+          break;
+        case NULL:
+          opArgs[i] = null;
+          break;
+        default:
+          throw new RuntimeException("Could not interpret " + arg.getType());
+        }
+
+        i++;
+      }
+    }
+
+    return new QueryState(type, sql, op, opArgs);
+  }
+
+  @Override public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((metaDataOperation == null) ? 0 : 
metaDataOperation.hashCode());
+    result = prime * result + Arrays.hashCode(operationArgs);
+    result = prime * result + ((sql == null) ? 0 : sql.hashCode());
+    return result;
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof QueryState)) {
+      return false;
+    }
+
+    QueryState other = (QueryState) obj;
+    if (metaDataOperation != other.metaDataOperation) {
+      return false;
+    }
+    if (!Arrays.deepEquals(operationArgs, other.operationArgs)) {
+      return false;
+    }
+    if (sql == null) {
+      if (other.sql != null) {
+        return false;
+      }
+    } else if (!sql.equals(other.sql)) {
+      return false;
+    }
+
+    return true;
+  }
+}
+
+// End QueryState.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java 
b/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
index 96d2459..b1fe787 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/UnregisteredDriver.java
@@ -43,7 +43,7 @@ import java.util.logging.Logger;
  * <p>The provider must implement:</p>
  * <ul>
  *   <li>{@link Meta#prepare(Meta.ConnectionHandle, String, long)}
- *   <li>{@link 
Meta#createIterable(org.apache.calcite.avatica.Meta.StatementHandle, 
org.apache.calcite.avatica.Meta.Signature, java.util.List, Meta.Frame)}
+ *   <li>{@link 
Meta#createIterable(org.apache.calcite.avatica.Meta.StatementHandle, 
org.apache.calcite.avatica.QueryState, 
org.apache.calcite.avatica.Meta.Signature, java.util.List, Meta.Frame)}
  * </ul>
  */
 public abstract class UnregisteredDriver implements java.sql.Driver {

Reply via email to