http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java
 
b/avatica/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java
new file mode 100644
index 0000000..12a5b59
--- /dev/null
+++ 
b/avatica/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.apache.calcite.avatica.proto.Common;
+
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+
+/**
+ * Identifies an operation from {@link DatabaseMetaData} which returns a 
{@link ResultSet}. This
+ * enum is used to allow clients to request the server to re-instantiate a 
{@link ResultSet} for
+ * these operations which do not have a SQL string associated with them as a 
normal query does.
+ */
+public enum MetaDataOperation {
+  GET_ATTRIBUTES,
+  GET_BEST_ROW_IDENTIFIER,
+  GET_CATALOGS,
+  GET_CLIENT_INFO_PROPERTIES,
+  GET_COLUMN_PRIVILEGES,
+  GET_COLUMNS,
+  GET_CROSS_REFERENCE,
+  GET_EXPORTED_KEYS,
+  GET_FUNCTION_COLUMNS,
+  GET_FUNCTIONS,
+  GET_IMPORTED_KEYS,
+  GET_INDEX_INFO,
+  GET_PRIMARY_KEYS,
+  GET_PROCEDURE_COLUMNS,
+  GET_PROCEDURES,
+  GET_PSEUDO_COLUMNS,
+  GET_SCHEMAS,
+  GET_SCHEMAS_WITH_ARGS,
+  GET_SUPER_TABLES,
+  GET_SUPER_TYPES,
+  GET_TABLE_PRIVILEGES,
+  GET_TABLES,
+  GET_TABLE_TYPES,
+  GET_TYPE_INFO,
+  GET_UDTS,
+  GET_VERSION_COLUMNS;
+
+  public Common.MetaDataOperation toProto() {
+    switch (this) {
+    case GET_ATTRIBUTES:
+      return Common.MetaDataOperation.GET_ATTRIBUTES;
+    case GET_BEST_ROW_IDENTIFIER:
+      return Common.MetaDataOperation.GET_BEST_ROW_IDENTIFIER;
+    case GET_CATALOGS:
+      return Common.MetaDataOperation.GET_CATALOGS;
+    case GET_CLIENT_INFO_PROPERTIES:
+      return Common.MetaDataOperation.GET_CLIENT_INFO_PROPERTIES;
+    case GET_COLUMNS:
+      return Common.MetaDataOperation.GET_COLUMNS;
+    case GET_COLUMN_PRIVILEGES:
+      return Common.MetaDataOperation.GET_COLUMN_PRIVILEGES;
+    case GET_CROSS_REFERENCE:
+      return Common.MetaDataOperation.GET_CROSS_REFERENCE;
+    case GET_EXPORTED_KEYS:
+      return Common.MetaDataOperation.GET_EXPORTED_KEYS;
+    case GET_FUNCTIONS:
+      return Common.MetaDataOperation.GET_FUNCTIONS;
+    case GET_FUNCTION_COLUMNS:
+      return Common.MetaDataOperation.GET_FUNCTION_COLUMNS;
+    case GET_IMPORTED_KEYS:
+      return Common.MetaDataOperation.GET_IMPORTED_KEYS;
+    case GET_INDEX_INFO:
+      return Common.MetaDataOperation.GET_INDEX_INFO;
+    case GET_PRIMARY_KEYS:
+      return Common.MetaDataOperation.GET_PRIMARY_KEYS;
+    case GET_PROCEDURES:
+      return Common.MetaDataOperation.GET_PROCEDURES;
+    case GET_PROCEDURE_COLUMNS:
+      return Common.MetaDataOperation.GET_PROCEDURE_COLUMNS;
+    case GET_PSEUDO_COLUMNS:
+      return Common.MetaDataOperation.GET_PSEUDO_COLUMNS;
+    case GET_SCHEMAS:
+      return Common.MetaDataOperation.GET_SCHEMAS;
+    case GET_SCHEMAS_WITH_ARGS:
+      return Common.MetaDataOperation.GET_SCHEMAS_WITH_ARGS;
+    case GET_SUPER_TABLES:
+      return Common.MetaDataOperation.GET_SUPER_TABLES;
+    case GET_SUPER_TYPES:
+      return Common.MetaDataOperation.GET_SUPER_TYPES;
+    case GET_TABLES:
+      return Common.MetaDataOperation.GET_TABLES;
+    case GET_TABLE_PRIVILEGES:
+      return Common.MetaDataOperation.GET_TABLE_PRIVILEGES;
+    case GET_TABLE_TYPES:
+      return Common.MetaDataOperation.GET_TABLE_TYPES;
+    case GET_TYPE_INFO:
+      return Common.MetaDataOperation.GET_TYPE_INFO;
+    case GET_UDTS:
+      return Common.MetaDataOperation.GET_UDTS;
+    case GET_VERSION_COLUMNS:
+      return Common.MetaDataOperation.GET_VERSION_COLUMNS;
+    default:
+      throw new RuntimeException("Unknown type: " + this);
+    }
+  }
+
+  public static MetaDataOperation fromProto(Common.MetaDataOperation protoOp) {
+    // Null is acceptable
+    if (null == protoOp) {
+      return null;
+    }
+
+    switch (protoOp) {
+    case GET_ATTRIBUTES:
+      return MetaDataOperation.GET_ATTRIBUTES;
+    case GET_BEST_ROW_IDENTIFIER:
+      return MetaDataOperation.GET_BEST_ROW_IDENTIFIER;
+    case GET_CATALOGS:
+      return MetaDataOperation.GET_CATALOGS;
+    case GET_CLIENT_INFO_PROPERTIES:
+      return MetaDataOperation.GET_CLIENT_INFO_PROPERTIES;
+    case GET_COLUMNS:
+      return MetaDataOperation.GET_COLUMNS;
+    case GET_COLUMN_PRIVILEGES:
+      return MetaDataOperation.GET_COLUMN_PRIVILEGES;
+    case GET_CROSS_REFERENCE:
+      return MetaDataOperation.GET_CROSS_REFERENCE;
+    case GET_EXPORTED_KEYS:
+      return MetaDataOperation.GET_EXPORTED_KEYS;
+    case GET_FUNCTIONS:
+      return MetaDataOperation.GET_FUNCTIONS;
+    case GET_FUNCTION_COLUMNS:
+      return MetaDataOperation.GET_FUNCTION_COLUMNS;
+    case GET_IMPORTED_KEYS:
+      return MetaDataOperation.GET_IMPORTED_KEYS;
+    case GET_INDEX_INFO:
+      return MetaDataOperation.GET_INDEX_INFO;
+    case GET_PRIMARY_KEYS:
+      return MetaDataOperation.GET_PRIMARY_KEYS;
+    case GET_PROCEDURES:
+      return MetaDataOperation.GET_PROCEDURES;
+    case GET_PROCEDURE_COLUMNS:
+      return MetaDataOperation.GET_PROCEDURE_COLUMNS;
+    case GET_PSEUDO_COLUMNS:
+      return MetaDataOperation.GET_PSEUDO_COLUMNS;
+    case GET_SCHEMAS:
+      return MetaDataOperation.GET_SCHEMAS;
+    case GET_SCHEMAS_WITH_ARGS:
+      return MetaDataOperation.GET_SCHEMAS_WITH_ARGS;
+    case GET_SUPER_TABLES:
+      return MetaDataOperation.GET_SUPER_TABLES;
+    case GET_SUPER_TYPES:
+      return MetaDataOperation.GET_SUPER_TYPES;
+    case GET_TABLES:
+      return MetaDataOperation.GET_TABLES;
+    case GET_TABLE_PRIVILEGES:
+      return MetaDataOperation.GET_TABLE_PRIVILEGES;
+    case GET_TABLE_TYPES:
+      return MetaDataOperation.GET_TABLE_TYPES;
+    case GET_TYPE_INFO:
+      return MetaDataOperation.GET_TYPE_INFO;
+    case GET_UDTS:
+      return MetaDataOperation.GET_UDTS;
+    case GET_VERSION_COLUMNS:
+      return MetaDataOperation.GET_VERSION_COLUMNS;
+    default:
+      throw new RuntimeException("Unknown type: " + protoOp);
+    }
+  }
+}
+
+// End MetaDataOperation.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java 
b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
index 2c2c7b2..11a6104 100644
--- 
a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
+++ 
b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java
@@ -46,7 +46,7 @@ public class MockJsonService extends JsonService {
   /** Factory that creates a {@code MockJsonService}. */
   public static class Factory implements Service.Factory {
     public Service create(AvaticaConnection connection) {
-      final String connectionId = connection.handle.id;
+      final String connectionId = connection.id;
       final Map<String, String> map1 = new HashMap<>();
       try {
         map1.put(

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java
 
b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java
index 3ca21f7..b04980b 100644
--- 
a/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java
+++ 
b/avatica/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java
@@ -136,7 +136,7 @@ public class MockProtobufService extends ProtobufService {
    */
   public static class MockProtobufServiceFactory implements Service.Factory {
     @Override public Service create(AvaticaConnection connection) {
-      return new MockProtobufService(connection.handle.id);
+      return new MockProtobufService(connection.id);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java 
b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java
index 8414708..8a79fc5 100644
--- 
a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java
+++ 
b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java
@@ -99,6 +99,10 @@ public abstract class ProtobufService extends 
AbstractService {
     return finagle((ExecuteResponse) _apply(request));
   }
 
+  @Override public SyncResultsResponse apply(SyncResultsRequest request) {
+    return (SyncResultsResponse) _apply(request);
+  }
+
   /**
    * Determines whether the given message has the field, denoted by the 
provided number, set.
    *

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
 
b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
index 31d522d..9c68beb 100644
--- 
a/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
+++ 
b/avatica/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslationImpl.java
@@ -30,6 +30,7 @@ import 
org.apache.calcite.avatica.proto.Requests.OpenConnectionRequest;
 import org.apache.calcite.avatica.proto.Requests.PrepareAndExecuteRequest;
 import org.apache.calcite.avatica.proto.Requests.PrepareRequest;
 import org.apache.calcite.avatica.proto.Requests.SchemasRequest;
+import org.apache.calcite.avatica.proto.Requests.SyncResultsRequest;
 import org.apache.calcite.avatica.proto.Requests.TableTypesRequest;
 import org.apache.calcite.avatica.proto.Requests.TablesRequest;
 import org.apache.calcite.avatica.proto.Requests.TypeInfoRequest;
@@ -44,6 +45,7 @@ import 
org.apache.calcite.avatica.proto.Responses.FetchResponse;
 import org.apache.calcite.avatica.proto.Responses.OpenConnectionResponse;
 import org.apache.calcite.avatica.proto.Responses.PrepareResponse;
 import org.apache.calcite.avatica.proto.Responses.ResultSetResponse;
+import org.apache.calcite.avatica.proto.Responses.SyncResultsResponse;
 import org.apache.calcite.avatica.remote.Service.Request;
 import org.apache.calcite.avatica.remote.Service.Response;
 
@@ -106,6 +108,8 @@ public class ProtobufTranslationImpl implements 
ProtobufTranslation {
         new RequestTranslator(TypeInfoRequest.parser(), new 
Service.TypeInfoRequest()));
     reqParsers.put(ExecuteRequest.class.getName(),
         new RequestTranslator(ExecuteRequest.parser(), new 
Service.ExecuteRequest()));
+    reqParsers.put(SyncResultsRequest.class.getName(),
+        new RequestTranslator(SyncResultsRequest.parser(), new 
Service.SyncResultsRequest()));
 
     REQUEST_PARSERS = Collections.unmodifiableMap(reqParsers);
 
@@ -138,6 +142,8 @@ public class ProtobufTranslationImpl implements 
ProtobufTranslation {
         new ResponseTranslator(ResultSetResponse.parser(), new 
Service.ResultSetResponse()));
     respParsers.put(ErrorResponse.class.getName(),
         new ResponseTranslator(ErrorResponse.parser(), new 
Service.ErrorResponse()));
+    respParsers.put(SyncResultsResponse.class.getName(),
+        new ResponseTranslator(SyncResultsResponse.parser(), new 
Service.SyncResultsResponse()));
 
     RESPONSE_PARSERS = Collections.unmodifiableMap(respParsers);
   }

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java 
b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
index e82b7ef..e741ffb 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteMeta.java
@@ -17,11 +17,15 @@
 package org.apache.calcite.avatica.remote;
 
 import org.apache.calcite.avatica.AvaticaConnection;
+import org.apache.calcite.avatica.AvaticaConnection.CallableWithoutException;
 import org.apache.calcite.avatica.AvaticaParameter;
 import org.apache.calcite.avatica.ColumnMetaData;
 import org.apache.calcite.avatica.ConnectionPropertiesImpl;
 import org.apache.calcite.avatica.Meta;
 import org.apache.calcite.avatica.MetaImpl;
+import org.apache.calcite.avatica.MissingResultsException;
+import org.apache.calcite.avatica.NoSuchStatementException;
+import org.apache.calcite.avatica.QueryState;
 
 import java.sql.SQLException;
 import java.util.ArrayList;
@@ -75,150 +79,296 @@ class RemoteMeta extends MetaImpl {
     }
   }
 
-  @Override public StatementHandle createStatement(ConnectionHandle ch) {
-    connectionSync(ch, new ConnectionPropertiesImpl()); // sync connection 
state if necessary
-    final Service.CreateStatementResponse response =
-        service.apply(new Service.CreateStatementRequest(ch.id));
-    return new StatementHandle(response.connectionId, response.statementId,
-        null);
+  @Override public StatementHandle createStatement(final ConnectionHandle ch) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<StatementHandle>() {
+          public StatementHandle call() {
+            // sync connection state if necessary
+            connectionSync(ch, new ConnectionPropertiesImpl());
+            final Service.CreateStatementResponse response =
+                service.apply(new Service.CreateStatementRequest(ch.id));
+            return new StatementHandle(response.connectionId, 
response.statementId, null);
+          }
+        });
   }
 
-  @Override public void closeStatement(StatementHandle h) {
-    final Service.CloseStatementResponse response =
-        service.apply(new Service.CloseStatementRequest(h.connectionId, h.id));
+  @Override public void closeStatement(final StatementHandle h) {
+    connection.invokeWithRetries(
+        new CallableWithoutException<Void>() {
+          public Void call() {
+            final Service.CloseStatementResponse response =
+                service.apply(
+                    new Service.CloseStatementRequest(h.connectionId, h.id));
+            return null;
+          }
+        });
   }
 
-  @Override public void openConnection(ConnectionHandle ch, Map<String, 
String> info) {
-    final Service.OpenConnectionResponse response =
-        service.apply(new Service.OpenConnectionRequest(ch.id, info));
+  @Override public void openConnection(final ConnectionHandle ch, final 
Map<String, String> info) {
+    connection.invokeWithRetries(
+        new CallableWithoutException<Void>() {
+          public Void call() {
+            final Service.OpenConnectionResponse response =
+                service.apply(new Service.OpenConnectionRequest(ch.id, info));
+            return null;
+          }
+        });
   }
 
-  @Override public void closeConnection(ConnectionHandle ch) {
-    final Service.CloseConnectionResponse response =
-        service.apply(new Service.CloseConnectionRequest(ch.id));
-    propsMap.remove(ch.id);
+  @Override public void closeConnection(final ConnectionHandle ch) {
+    connection.invokeWithRetries(
+        new CallableWithoutException<Void>() {
+          public Void call() {
+            final Service.CloseConnectionResponse response =
+                service.apply(new Service.CloseConnectionRequest(ch.id));
+            propsMap.remove(ch.id);
+            return null;
+          }
+        });
   }
 
-  @Override public ConnectionProperties connectionSync(ConnectionHandle ch,
-      ConnectionProperties connProps) {
-    ConnectionPropertiesImpl localProps = propsMap.get(ch.id);
-    if (localProps == null) {
-      localProps = new ConnectionPropertiesImpl();
-      localProps.setDirty(true);
-      propsMap.put(ch.id, localProps);
-    }
+  @Override public ConnectionProperties connectionSync(final ConnectionHandle 
ch,
+      final ConnectionProperties connProps) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<ConnectionProperties>() {
+          public ConnectionProperties call() {
+            ConnectionPropertiesImpl localProps = propsMap.get(ch.id);
+            if (localProps == null) {
+              localProps = new ConnectionPropertiesImpl();
+              localProps.setDirty(true);
+              propsMap.put(ch.id, localProps);
+            }
 
-    // Only make an RPC if necessary. RPC is necessary when we have local 
changes that need
-    // flushed to the server (be sure to introduce any new changes from 
connProps before checking
-    // AND when connProps.isEmpty() (meaning, this was a request for a value, 
not overriding a
-    // value). Otherwise, accumulate the change locally and return immediately.
-    if (localProps.merge(connProps).isDirty() && connProps.isEmpty()) {
-      final Service.ConnectionSyncResponse response = service.apply(
-          new Service.ConnectionSyncRequest(ch.id, localProps));
-      propsMap.put(ch.id, (ConnectionPropertiesImpl) response.connProps);
-      return response.connProps;
-    } else {
-      return localProps;
-    }
+            // Only make an RPC if necessary. RPC is necessary when we have 
local changes that need
+            // flushed to the server (be sure to introduce any new changes 
from connProps before
+            // checking AND when connProps.isEmpty() (meaning, this was a 
request for a value, not
+            // overriding a value). Otherwise, accumulate the change locally 
and return immediately.
+            if (localProps.merge(connProps).isDirty() && connProps.isEmpty()) {
+              final Service.ConnectionSyncResponse response = service.apply(
+                  new Service.ConnectionSyncRequest(ch.id, localProps));
+              propsMap.put(ch.id, (ConnectionPropertiesImpl) 
response.connProps);
+              return response.connProps;
+            } else {
+              return localProps;
+            }
+          }
+        });
   }
 
-  @Override public MetaResultSet getCatalogs(ConnectionHandle ch) {
-    final Service.ResultSetResponse response =
-        service.apply(new Service.CatalogsRequest(ch.id));
-    return toResultSet(MetaCatalog.class, response);
+  @Override public MetaResultSet getCatalogs(final ConnectionHandle ch) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<MetaResultSet>() {
+          public MetaResultSet call() {
+            final Service.ResultSetResponse response =
+                service.apply(new Service.CatalogsRequest(ch.id));
+            return toResultSet(MetaCatalog.class, response);
+          }
+        });
   }
 
-  @Override public MetaResultSet getSchemas(ConnectionHandle ch, String 
catalog,
-      Pat schemaPattern) {
-    final Service.ResultSetResponse response =
-        service.apply(new Service.SchemasRequest(ch.id, catalog, 
schemaPattern.s));
-    return toResultSet(MetaSchema.class, response);
+  @Override public MetaResultSet getSchemas(final ConnectionHandle ch, final 
String catalog,
+      final Pat schemaPattern) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<MetaResultSet>() {
+          public MetaResultSet call() {
+            final Service.ResultSetResponse response =
+                service.apply(
+                    new Service.SchemasRequest(ch.id, catalog, 
schemaPattern.s));
+            return toResultSet(MetaSchema.class, response);
+          }
+        });
   }
 
-  @Override public MetaResultSet getTables(ConnectionHandle ch, String 
catalog, Pat schemaPattern,
-      Pat tableNamePattern, List<String> typeList) {
-    final Service.ResultSetResponse response =
-        service.apply(
-            new Service.TablesRequest(ch.id, catalog, schemaPattern.s,
-                tableNamePattern.s, typeList));
-    return toResultSet(MetaTable.class, response);
+  @Override public MetaResultSet getTables(final ConnectionHandle ch, final 
String catalog,
+      final Pat schemaPattern, final Pat tableNamePattern, final List<String> 
typeList) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<MetaResultSet>() {
+          public MetaResultSet call() {
+            final Service.ResultSetResponse response =
+                service.apply(
+                    new Service.TablesRequest(ch.id, catalog, schemaPattern.s,
+                        tableNamePattern.s, typeList));
+            return toResultSet(MetaTable.class, response);
+          }
+        });
   }
 
-  @Override public MetaResultSet getTableTypes(ConnectionHandle ch) {
-    final Service.ResultSetResponse response =
-        service.apply(new Service.TableTypesRequest(ch.id));
-    return toResultSet(MetaTableType.class, response);
+  @Override public MetaResultSet getTableTypes(final ConnectionHandle ch) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<MetaResultSet>() {
+          public MetaResultSet call() {
+            final Service.ResultSetResponse response =
+                service.apply(new Service.TableTypesRequest(ch.id));
+            return toResultSet(MetaTableType.class, response);
+          }
+        });
   }
 
-  @Override public MetaResultSet getTypeInfo(ConnectionHandle ch) {
-    final Service.ResultSetResponse response =
-        service.apply(new Service.TypeInfoRequest(ch.id));
-    return toResultSet(MetaTypeInfo.class, response);
+  @Override public MetaResultSet getTypeInfo(final ConnectionHandle ch) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<MetaResultSet>() {
+          public MetaResultSet call() {
+            final Service.ResultSetResponse response =
+                service.apply(new Service.TypeInfoRequest(ch.id));
+            return toResultSet(MetaTypeInfo.class, response);
+          }
+        });
   }
 
-  @Override public MetaResultSet getColumns(ConnectionHandle ch, String 
catalog, Pat schemaPattern,
-      Pat tableNamePattern, Pat columnNamePattern) {
-    final Service.ResultSetResponse response =
-        service.apply(
-            new Service.ColumnsRequest(ch.id, catalog, schemaPattern.s,
-                tableNamePattern.s, columnNamePattern.s));
-    return toResultSet(MetaColumn.class, response);
+  @Override public MetaResultSet getColumns(final ConnectionHandle ch, final 
String catalog,
+      final Pat schemaPattern, final Pat tableNamePattern, final Pat 
columnNamePattern) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<MetaResultSet>() {
+          public MetaResultSet call() {
+            final Service.ResultSetResponse response =
+                service.apply(
+                    new Service.ColumnsRequest(ch.id, catalog, schemaPattern.s,
+                        tableNamePattern.s, columnNamePattern.s));
+            return toResultSet(MetaColumn.class, response);
+          }
+        });
   }
 
-  @Override public StatementHandle prepare(ConnectionHandle ch, String sql,
-      long maxRowCount) {
-    connectionSync(ch, new ConnectionPropertiesImpl()); // sync connection 
state if necessary
-    final Service.PrepareResponse response = service.apply(
-        new Service.PrepareRequest(ch.id, sql, maxRowCount));
-    return response.statement;
+  @Override public StatementHandle prepare(final ConnectionHandle ch, final 
String sql,
+      final long maxRowCount) {
+    return connection.invokeWithRetries(
+        new CallableWithoutException<StatementHandle>() {
+          public StatementHandle call() {
+            connectionSync(ch,
+                new ConnectionPropertiesImpl()); // sync connection state if 
necessary
+            final Service.PrepareResponse response = service.apply(
+                new Service.PrepareRequest(ch.id, sql, maxRowCount));
+            return response.statement;
+          }
+        });
   }
 
-  @Override public ExecuteResult prepareAndExecute(StatementHandle h,
-      String sql, long maxRowCount, PrepareCallback callback) {
-    // sync connection state if necessary
-    connectionSync(new ConnectionHandle(h.connectionId), new 
ConnectionPropertiesImpl());
-    final Service.ExecuteResponse response;
+  @Override public ExecuteResult prepareAndExecute(final StatementHandle h, 
final String sql,
+      final long maxRowCount, final PrepareCallback callback) throws 
NoSuchStatementException {
     try {
-      synchronized (callback.getMonitor()) {
-        callback.clear();
-        response = service.apply(
-            new Service.PrepareAndExecuteRequest(h.connectionId,
-              h.id, sql, maxRowCount));
-        if (response.results.size() > 0) {
-          final Service.ResultSetResponse result = response.results.get(0);
-          callback.assign(result.signature, result.firstFrame,
-              result.updateCount);
-        }
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ExecuteResult>() {
+            public ExecuteResult call() {
+              // sync connection state if necessary
+              connectionSync(new ConnectionHandle(h.connectionId), new 
ConnectionPropertiesImpl());
+              final Service.ExecuteResponse response;
+              try {
+                synchronized (callback.getMonitor()) {
+                  callback.clear();
+                  response = service.apply(
+                      new Service.PrepareAndExecuteRequest(h.connectionId,
+                          h.id, sql, maxRowCount));
+                  if (response.missingStatement) {
+                    throw new RuntimeException(new 
NoSuchStatementException(h));
+                  }
+                  if (response.results.size() > 0) {
+                    final Service.ResultSetResponse result = 
response.results.get(0);
+                    callback.assign(result.signature, result.firstFrame,
+                        result.updateCount);
+                  }
+                }
+                callback.execute();
+                List<MetaResultSet> metaResultSets = new ArrayList<>();
+                for (Service.ResultSetResponse result : response.results) {
+                  metaResultSets.add(toResultSet(null, result));
+                }
+                return new ExecuteResult(metaResultSets);
+              } catch (SQLException e) {
+                throw new RuntimeException(e);
+              }
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof NoSuchStatementException) {
+        throw (NoSuchStatementException) cause;
       }
-      callback.execute();
-      List<MetaResultSet> metaResultSets = new ArrayList<>();
-      for (Service.ResultSetResponse result : response.results) {
-        metaResultSets.add(toResultSet(null, result));
-      }
-      return new ExecuteResult(metaResultSets);
-    } catch (SQLException e) {
-      throw new RuntimeException(e);
+      throw e;
     }
   }
 
-  @Override public Frame fetch(StatementHandle h, long offset, int 
fetchMaxRowCount) {
-    final Service.FetchResponse response =
-        service.apply(
-            new Service.FetchRequest(h.connectionId, h.id, offset, 
fetchMaxRowCount));
-    return response.frame;
+  @Override public Frame fetch(final StatementHandle h, final long offset,
+      final int fetchMaxRowCount) throws NoSuchStatementException, 
MissingResultsException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<Frame>() {
+            public Frame call() {
+              final Service.FetchResponse response =
+                  service.apply(
+                      new Service.FetchRequest(h.connectionId, h.id, offset, 
fetchMaxRowCount));
+              if (response.missingStatement) {
+                throw new RuntimeException(new NoSuchStatementException(h));
+              }
+              if (response.missingResults) {
+                throw new RuntimeException(new MissingResultsException(h));
+              }
+              return response.frame;
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof NoSuchStatementException) {
+        throw (NoSuchStatementException) cause;
+      } else if (cause instanceof MissingResultsException) {
+        throw (MissingResultsException) cause;
+      }
+      throw e;
+    }
   }
 
-  @Override public ExecuteResult execute(StatementHandle h,
-      List<TypedValue> parameterValues, long maxRowCount) {
-    final Service.ExecuteResponse response = service.apply(
-        new Service.ExecuteRequest(h, parameterValues, maxRowCount));
+  @Override public ExecuteResult execute(final StatementHandle h,
+      final List<TypedValue> parameterValues, final long maxRowCount)
+      throws NoSuchStatementException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<ExecuteResult>() {
+            public ExecuteResult call() {
+              final Service.ExecuteResponse response = service.apply(
+                  new Service.ExecuteRequest(h, parameterValues, maxRowCount));
+
+              if (response.missingStatement) {
+                throw new RuntimeException(new NoSuchStatementException(h));
+              }
+
+              List<MetaResultSet> metaResultSets = new ArrayList<>();
+              for (Service.ResultSetResponse result : response.results) {
+                metaResultSets.add(toResultSet(null, result));
+              }
 
-    List<MetaResultSet> metaResultSets = new ArrayList<>();
-    for (Service.ResultSetResponse result : response.results) {
-      metaResultSets.add(toResultSet(null, result));
+              return new ExecuteResult(metaResultSets);
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof NoSuchStatementException) {
+        throw (NoSuchStatementException) cause;
+      }
+      throw e;
     }
+  }
 
-    return new ExecuteResult(metaResultSets);
+  @Override public boolean syncResults(final StatementHandle h, final 
QueryState state,
+      final long offset) throws NoSuchStatementException {
+    try {
+      return connection.invokeWithRetries(
+          new CallableWithoutException<Boolean>() {
+            public Boolean call() {
+              final Service.SyncResultsResponse response =
+                  service.apply(
+                      new Service.SyncResultsRequest(h.connectionId, h.id, 
state, offset));
+              if (response.missingStatement) {
+                throw new RuntimeException(new NoSuchStatementException(h));
+              }
+              return response.moreResults;
+            }
+          });
+    } catch (RuntimeException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof NoSuchStatementException) {
+        throw (NoSuchStatementException) cause;
+      }
+      throw e;
+    }
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
 
b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
index fa447de..828513a 100644
--- 
a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
+++ 
b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteProtobufService.java
@@ -16,56 +16,26 @@
  */
 package org.apache.calcite.avatica.remote;
 
-import org.apache.calcite.avatica.AvaticaUtils;
-
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
 
 /**
  * ProtobufService implementation that queries against a remote 
implementation, using
  * protocol buffers as the serialized form.
  */
 public class RemoteProtobufService extends ProtobufService {
-  private final URL url;
+  private final AvaticaHttpClient client;
   private final ProtobufTranslation translation;
 
-  public RemoteProtobufService(URL url, ProtobufTranslation translation) {
-    this.url = url;
+  public RemoteProtobufService(AvaticaHttpClient client, ProtobufTranslation 
translation) {
+    this.client = client;
     this.translation = translation;
   }
 
   @Override public Response _apply(Request request) {
-    final InputStream inputStream;
-
-    try {
-      final HttpURLConnection connection =
-          (HttpURLConnection) url.openConnection();
-      connection.setRequestMethod("POST");
-      connection.setDoInput(true);
-      connection.setDoOutput(true);
-      try (DataOutputStream wr = new 
DataOutputStream(connection.getOutputStream())) {
-        // Convert the Request to a protobuf and send it over the wire
-        wr.write(translation.serializeRequest(request));
-        wr.flush();
-        wr.close();
-      }
-      final int responseCode = connection.getResponseCode();
-      if (responseCode != HttpURLConnection.HTTP_OK) {
-        inputStream = connection.getErrorStream();
-      } else {
-        inputStream = connection.getInputStream();
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
-    Response resp;
+    final Response resp;
     try {
-      // Read the (serialized protobuf) response off the wire and convert it 
back to a Response
-      resp = 
translation.parseResponse(AvaticaUtils.readFullyToBytes(inputStream));
+      byte[] response = client.send(translation.serializeRequest(request));
+      resp = translation.parseResponse(response);
     } catch (IOException e) {
       // Not a protobuf that we could parse.
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java 
b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java
index f661cbd..d4828b5 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/RemoteService.java
@@ -16,13 +16,7 @@
  */
 package org.apache.calcite.avatica.remote;
 
-import org.apache.calcite.avatica.AvaticaUtils;
-
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
+import java.nio.charset.StandardCharsets;
 
 /**
  * Implementation of {@link org.apache.calcite.avatica.remote.Service}
@@ -30,42 +24,15 @@ import java.net.URL;
  * usually an HTTP server.
  */
 public class RemoteService extends JsonService {
-  private final URL url;
+  private final AvaticaHttpClient client;
 
-  public RemoteService(URL url) {
-    this.url = url;
+  public RemoteService(AvaticaHttpClient client) {
+    this.client = client;
   }
 
   @Override public String apply(String request) {
-    try {
-      final HttpURLConnection connection =
-          (HttpURLConnection) url.openConnection();
-      connection.setRequestMethod("POST");
-      connection.setDoInput(true);
-      connection.setDoOutput(true);
-      if (request.length() < 256) {
-        connection.setRequestProperty("request", request);
-      } else {
-        try (DataOutputStream wr
-            = new DataOutputStream(connection.getOutputStream())) {
-          wr.writeBytes(request);
-          wr.flush();
-          wr.close();
-        }
-      }
-      final int responseCode = connection.getResponseCode();
-      final InputStream inputStream;
-      if (responseCode != HttpURLConnection.HTTP_OK) {
-        inputStream = connection.getErrorStream();
-      } else {
-        inputStream = connection.getInputStream();
-      }
-
-      return AvaticaUtils.readFully(inputStream);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
+    byte[] response = client.send(request.getBytes(StandardCharsets.UTF_8));
+    return new String(response, StandardCharsets.UTF_8);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java 
b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
index 2309661..473e96c 100644
--- a/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
+++ b/avatica/src/main/java/org/apache/calcite/avatica/remote/Service.java
@@ -19,8 +19,10 @@ package org.apache.calcite.avatica.remote;
 import org.apache.calcite.avatica.AvaticaClientRuntimeException;
 import org.apache.calcite.avatica.AvaticaConnection;
 import org.apache.calcite.avatica.AvaticaSeverity;
+import org.apache.calcite.avatica.BuiltInConnectionProperty;
 import org.apache.calcite.avatica.ConnectionPropertiesImpl;
 import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.QueryState;
 import org.apache.calcite.avatica.proto.Common;
 import org.apache.calcite.avatica.proto.Requests;
 import org.apache.calcite.avatica.proto.Responses;
@@ -29,6 +31,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
 import com.google.protobuf.Descriptors.Descriptor;
 import com.google.protobuf.Message;
 
@@ -42,6 +45,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
+import java.util.Properties;
 
 /**
  * API for request-response calls to an Avatica server.
@@ -56,6 +60,7 @@ public interface Service {
   PrepareResponse apply(PrepareRequest request);
   ExecuteResponse apply(ExecuteRequest request);
   ExecuteResponse apply(PrepareAndExecuteRequest request);
+  SyncResultsResponse apply(SyncResultsRequest request);
   FetchResponse apply(FetchRequest request);
   CreateStatementResponse apply(CreateStatementRequest request);
   CloseStatementResponse apply(CloseStatementRequest request);
@@ -95,7 +100,8 @@ public interface Service {
       @JsonSubTypes.Type(value = CloseConnectionRequest.class,
           name = "closeConnection"),
       @JsonSubTypes.Type(value = ConnectionSyncRequest.class, name = 
"connectionSync"),
-      @JsonSubTypes.Type(value = DatabasePropertyRequest.class, name = 
"databaseProperties") })
+      @JsonSubTypes.Type(value = DatabasePropertyRequest.class, name = 
"databaseProperties"),
+      @JsonSubTypes.Type(value = SyncResultsRequest.class, name = 
"syncResults") })
   abstract class Request {
     abstract Response accept(Service service);
     abstract Request deserialize(Message genericMsg);
@@ -121,7 +127,8 @@ public interface Service {
       @JsonSubTypes.Type(value = ConnectionSyncResponse.class, name = 
"connectionSync"),
       @JsonSubTypes.Type(value = DatabasePropertyResponse.class, name = 
"databaseProperties"),
       @JsonSubTypes.Type(value = ExecuteResponse.class, name = 
"executeResults"),
-      @JsonSubTypes.Type(value = ErrorResponse.class, name = "error") })
+      @JsonSubTypes.Type(value = ErrorResponse.class, name = "error"),
+      @JsonSubTypes.Type(value = SyncResultsResponse.class, name = 
"syncResults") })
   abstract class Response {
     abstract Response deserialize(Message genericMsg);
     abstract Message serialize();
@@ -1242,15 +1249,17 @@ public interface Service {
    * {@link 
org.apache.calcite.avatica.remote.Service.PrepareAndExecuteRequest}. */
   class ExecuteResponse extends Response {
     public final List<ResultSetResponse> results;
+    public boolean missingStatement = false;
 
     ExecuteResponse() {
       results = null;
     }
 
     @JsonCreator
-    public ExecuteResponse(
-        @JsonProperty("resultSets") List<ResultSetResponse> results) {
+    public ExecuteResponse(@JsonProperty("resultSets") List<ResultSetResponse> 
results,
+        @JsonProperty("missingStatement") boolean missingStatement) {
       this.results = results;
+      this.missingStatement = missingStatement;
     }
 
     @Override ExecuteResponse deserialize(Message genericMsg) {
@@ -1268,7 +1277,7 @@ public interface Service {
         copiedResults.add(ResultSetResponse.fromProto(msgResult));
       }
 
-      return new ExecuteResponse(copiedResults);
+      return new ExecuteResponse(copiedResults, msg.getMissingStatement());
     }
 
     @Override Responses.ExecuteResponse serialize() {
@@ -1278,7 +1287,7 @@ public interface Service {
         builder.addResults(result.serialize());
       }
 
-      return builder.build();
+      return builder.setMissingStatement(missingStatement).build();
     }
 
     @Override public int hashCode() {
@@ -1581,14 +1590,20 @@ public interface Service {
    * {@link org.apache.calcite.avatica.remote.Service.FetchRequest}. */
   class FetchResponse extends Response {
     public final Meta.Frame frame;
+    public boolean missingStatement = false;
+    public boolean missingResults = false;
 
     FetchResponse() {
       frame = null;
     }
 
     @JsonCreator
-    public FetchResponse(@JsonProperty("frame") Meta.Frame frame) {
+    public FetchResponse(@JsonProperty("frame") Meta.Frame frame,
+        @JsonProperty("missingStatement") boolean missingStatement,
+        @JsonProperty("missingResults") boolean missingResults) {
       this.frame = frame;
+      this.missingStatement = missingStatement;
+      this.missingResults = missingResults;
     }
 
     @Override FetchResponse deserialize(Message genericMsg) {
@@ -1599,7 +1614,8 @@ public interface Service {
 
       Responses.FetchResponse msg = (Responses.FetchResponse) genericMsg;
 
-      return new FetchResponse(Meta.Frame.fromProto(msg.getFrame()));
+      return new FetchResponse(Meta.Frame.fromProto(msg.getFrame()), 
msg.getMissingStatement(),
+          msg.getMissingResults());
     }
 
     @Override Responses.FetchResponse serialize() {
@@ -1609,7 +1625,8 @@ public interface Service {
         builder.setFrame(frame.toProto());
       }
 
-      return builder.build();
+      return builder.setMissingStatement(missingStatement)
+          .setMissingResults(missingResults).build();
     }
 
     @Override public int hashCode() {
@@ -1634,7 +1651,7 @@ public interface Service {
           return false;
         }
 
-        return true;
+        return missingStatement == other.missingStatement;
       }
 
       return false;
@@ -1932,6 +1949,31 @@ public interface Service {
       return service.apply(this);
     }
 
+    /**
+     * Serializes the necessary properties into a Map.
+     *
+     * @param props The properties to serialize.
+     * @return A representation of the Properties as a Map.
+     */
+    public static Map<String, String> serializeProperties(Properties props) {
+      Map<String, String> infoAsString = new HashMap<>();
+      for (Map.Entry<Object, Object> entry : props.entrySet()) {
+        // Determine if this is a property we want to forward to the server
+        boolean localProperty = false;
+        for (BuiltInConnectionProperty prop : 
BuiltInConnectionProperty.values()) {
+          if (prop.camelName().equals(entry.getKey())) {
+            localProperty = true;
+            break;
+          }
+        }
+
+        if (!localProperty) {
+          infoAsString.put(entry.getKey().toString(), 
entry.getValue().toString());
+        }
+      }
+      return infoAsString;
+    }
+
     @Override Request deserialize(Message genericMsg) {
       if (!(genericMsg instanceof Requests.OpenConnectionRequest)) {
         throw new IllegalArgumentException(
@@ -2452,6 +2494,8 @@ public interface Service {
    */
   public class ErrorResponse extends Response {
     public static final int UNKNOWN_ERROR_CODE = -1;
+    public static final int MISSING_CONNECTION_ERROR_CODE = 1;
+
     public static final String UNKNOWN_SQL_STATE = "00000";
 
     public final List<String> exceptions;
@@ -2590,6 +2634,169 @@ public interface Service {
           sqlState, severity, exceptions);
     }
   }
+
+  /**
+   * Request for {@link Service#apply(SyncResultsRequest)}
+   */
+  class SyncResultsRequest extends Request {
+    public final String connectionId;
+    public final int statementId;
+    public final QueryState state;
+    public final long offset;
+
+    SyncResultsRequest() {
+      this.connectionId = null;
+      this.statementId = 0;
+      this.state = null;
+      this.offset = 0L;
+    }
+
+    public SyncResultsRequest(@JsonProperty("connectionId") String 
connectionId,
+        @JsonProperty("statementId") int statementId, @JsonProperty("state") 
QueryState state,
+        @JsonProperty("offset") long offset) {
+      this.connectionId = connectionId;
+      this.statementId = statementId;
+      this.state = state;
+      this.offset = offset;
+    }
+
+    SyncResultsResponse accept(Service service) {
+      return service.apply(this);
+    }
+
+    Request deserialize(Message genericMsg) {
+      if (!(genericMsg instanceof Requests.SyncResultsRequest)) {
+        throw new IllegalArgumentException(
+            "Expected SyncResultsRequest, but got " + 
genericMsg.getClass().getName());
+      }
+
+      Requests.SyncResultsRequest msg = (Requests.SyncResultsRequest) 
genericMsg;
+
+      return new SyncResultsRequest(msg.getConnectionId(), 
msg.getStatementId(),
+          QueryState.fromProto(msg.getState()), msg.getOffset());
+    }
+
+    Requests.SyncResultsRequest serialize() {
+      Requests.SyncResultsRequest.Builder builder = 
Requests.SyncResultsRequest.newBuilder();
+
+      if (null != connectionId) {
+        builder.setConnectionId(connectionId);
+      }
+
+      if (null != state) {
+        builder.setState(state.toProto());
+      }
+
+      builder.setStatementId(statementId);
+      builder.setOffset(offset);
+
+      return builder.build();
+    }
+
+    @Override public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((connectionId == null) ? 0 : 
connectionId.hashCode());
+      result = prime * result + (int) (offset ^ (offset >>> 32));
+      result = prime * result + ((state == null) ? 0 : state.hashCode());
+      result = prime * result + statementId;
+      return result;
+    }
+
+    @Override public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+
+      if (null == obj || !(obj instanceof SyncResultsRequest)) {
+        return false;
+      }
+
+      SyncResultsRequest other = (SyncResultsRequest) obj;
+
+      if (connectionId == null) {
+        if (other.connectionId != null) {
+          return false;
+        }
+      } else if (!connectionId.equals(other.connectionId)) {
+        return false;
+      }
+
+      if (offset != other.offset) {
+        return false;
+      }
+
+      if (state == null) {
+        if (other.state != null) {
+          return false;
+        }
+      } else if (!state.equals(other.state)) {
+        return false;
+      }
+
+      if (statementId != other.statementId) {
+        return false;
+      }
+
+      return true;
+    }
+  }
+
+  /**
+   * Response for {@link Service#apply(SyncResultsRequest)}.
+   */
+  class SyncResultsResponse extends Response {
+    public boolean missingStatement = false;
+    public final boolean moreResults;
+
+    SyncResultsResponse() {
+      this.moreResults = false;
+    }
+
+    public SyncResultsResponse(@JsonProperty("moreResults") boolean 
moreResults,
+        @JsonProperty("missingStatement") boolean missingStatement) {
+      this.moreResults = moreResults;
+      this.missingStatement = missingStatement;
+    }
+
+    SyncResultsResponse deserialize(Message genericMsg) {
+      if (!(genericMsg instanceof Responses.SyncResultsResponse)) {
+        throw new IllegalArgumentException(
+            "Expected SyncResultsResponse, but got " + 
genericMsg.getClass().getName());
+      }
+
+      Responses.SyncResultsResponse msg = (Responses.SyncResultsResponse) 
genericMsg;
+
+      return new SyncResultsResponse(msg.getMoreResults(), 
msg.getMissingStatement());
+    }
+
+    Responses.SyncResultsResponse serialize() {
+      Responses.SyncResultsResponse.Builder builder = 
Responses.SyncResultsResponse.newBuilder();
+
+      return 
builder.setMoreResults(moreResults).setMissingStatement(missingStatement).build();
+    }
+
+    @Override public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + (missingStatement ? 1231 : 1237);
+      result = prime * result + (moreResults ? 1231 : 1237);
+      return result;
+    }
+
+    @Override public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null || !(obj instanceof SyncResultsResponse)) {
+        return false;
+      }
+
+      SyncResultsResponse other = (SyncResultsResponse) obj;
+
+      return missingStatement == other.missingStatement && moreResults == 
other.moreResults;
+    }
+  }
 }
 
 // End Service.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/protobuf/common.proto
----------------------------------------------------------------------
diff --git a/avatica/src/main/protobuf/common.proto 
b/avatica/src/main/protobuf/common.proto
index f4de1f9..5421ffc 100644
--- a/avatica/src/main/protobuf/common.proto
+++ b/avatica/src/main/protobuf/common.proto
@@ -206,3 +206,67 @@ enum Severity {
   ERROR_SEVERITY = 2;
   WARNING_SEVERITY = 3;
 }
+
+// Enumeration corresponding to DatabaseMetaData operations
+enum MetaDataOperation {
+  GET_ATTRIBUTES = 0;
+  GET_BEST_ROW_IDENTIFIER = 1;
+  GET_CATALOGS = 2;
+  GET_CLIENT_INFO_PROPERTIES = 3;
+  GET_COLUMN_PRIVILEGES = 4;
+  GET_COLUMNS = 5;
+  GET_CROSS_REFERENCE = 6;
+  GET_EXPORTED_KEYS = 7;
+  GET_FUNCTION_COLUMNS = 8;
+  GET_FUNCTIONS = 9;
+  GET_IMPORTED_KEYS = 10;
+  GET_INDEX_INFO = 11;
+  GET_PRIMARY_KEYS = 12;
+  GET_PROCEDURE_COLUMNS = 13;
+  GET_PROCEDURES = 14;
+  GET_PSEUDO_COLUMNS = 15;
+  GET_SCHEMAS = 16;
+  GET_SCHEMAS_WITH_ARGS = 17;
+  GET_SUPER_TABLES = 18;
+  GET_SUPER_TYPES = 19;
+  GET_TABLE_PRIVILEGES = 20;
+  GET_TABLES = 21;
+  GET_TABLE_TYPES = 22;
+  GET_TYPE_INFO = 23;
+  GET_UDTS = 24;
+  GET_VERSION_COLUMNS = 25;
+}
+
+// Represents the breadth of arguments to DatabaseMetaData functions
+message MetaDataOperationArgument {
+  enum ArgumentType {
+    STRING = 0;
+    BOOL = 1;
+    INT = 2;
+    REPEATED_STRING = 3;
+    REPEATED_INT = 4;
+    NULL = 5;
+  }
+    
+  string string_value = 1;
+  bool bool_value = 2;
+  sint32 int_value = 3;
+  repeated string string_array_values = 4;
+  repeated sint32 int_array_values = 5;
+  ArgumentType type = 6;
+}
+
+enum StateType {
+  SQL = 0;
+  METADATA = 1;
+}
+
+message QueryState {
+  StateType type = 1;
+  string sql = 2;
+  MetaDataOperation op = 3;
+  repeated MetaDataOperationArgument args = 4;
+  bool has_args = 5;
+  bool has_sql = 6;
+  bool has_op = 7;
+}

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/protobuf/requests.proto
----------------------------------------------------------------------
diff --git a/avatica/src/main/protobuf/requests.proto 
b/avatica/src/main/protobuf/requests.proto
index 02451da..4201e3b 100644
--- a/avatica/src/main/protobuf/requests.proto
+++ b/avatica/src/main/protobuf/requests.proto
@@ -127,3 +127,10 @@ message ExecuteRequest {
 }
 
 
+message SyncResultsRequest {
+  string connection_id = 1;
+  uint32 statement_id = 2;
+  QueryState state = 3;
+  uint64 offset = 4;
+}
+ 

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/main/protobuf/responses.proto
----------------------------------------------------------------------
diff --git a/avatica/src/main/protobuf/responses.proto 
b/avatica/src/main/protobuf/responses.proto
index 7c52be7..a899513 100644
--- a/avatica/src/main/protobuf/responses.proto
+++ b/avatica/src/main/protobuf/responses.proto
@@ -34,6 +34,7 @@ message ResultSetResponse {
 // Response to PrepareAndExecuteRequest
 message ExecuteResponse {
   repeated ResultSetResponse results = 1;
+  bool missing_statement = 2; // Did the request fail because of no-cached 
statement
 }
 
 // Response to PrepareRequest
@@ -44,6 +45,8 @@ message PrepareResponse {
 // Response to FetchRequest
 message FetchResponse {
   Frame frame = 1;
+  bool missing_statement = 2; // Did the request fail because of no-cached 
statement
+  bool missing_results = 3; // Did the request fail because of a 
cached-statement w/o ResultSet
 }
 
 // Response to CreateStatementRequest
@@ -90,3 +93,8 @@ message ErrorResponse {
   uint32 error_code = 4; // numeric identifier for error
   string sql_state = 5; // five-character standard-defined value
 }
+
+message SyncResultsResponse {
+  bool missing_statement = 1; // Server doesn't have the statement with the ID 
from the request
+  bool more_results = 2; // Should the client fetch() to get more results
+}

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/test/java/org/apache/calcite/avatica/AvaticaConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/test/java/org/apache/calcite/avatica/AvaticaConnectionTest.java 
b/avatica/src/test/java/org/apache/calcite/avatica/AvaticaConnectionTest.java
new file mode 100644
index 0000000..a1414c3
--- /dev/null
+++ 
b/avatica/src/test/java/org/apache/calcite/avatica/AvaticaConnectionTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Properties;
+
+/**
+ * Tests for AvaticaConnection
+ */
+public class AvaticaConnectionTest {
+
+  @Test
+  public void testNumExecuteRetries() {
+    AvaticaConnection statement = Mockito.mock(AvaticaConnection.class);
+
+    
Mockito.when(statement.getNumStatementRetries(Mockito.any(Properties.class)))
+      .thenCallRealMethod();
+
+    // Bad argument should throw an exception
+    try {
+      statement.getNumStatementRetries(null);
+      Assert.fail("Calling getNumStatementRetries with a null object should 
throw an exception");
+    } catch (NullPointerException e) {
+      // Pass
+    }
+
+    Properties props = new Properties();
+
+    // Verify the default value
+    
Assert.assertEquals(Long.valueOf(AvaticaConnection.NUM_EXECUTE_RETRIES_DEFAULT).longValue(),
+        statement.getNumStatementRetries(props));
+
+    // Set a non-default value
+    props.setProperty(AvaticaConnection.NUM_EXECUTE_RETRIES_KEY, "10");
+
+    // Verify that we observe that value
+    Assert.assertEquals(10, statement.getNumStatementRetries(props));
+  }
+
+}
+
+// End AvaticaConnectionTest.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/test/java/org/apache/calcite/avatica/QueryStateTest.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/test/java/org/apache/calcite/avatica/QueryStateTest.java 
b/avatica/src/test/java/org/apache/calcite/avatica/QueryStateTest.java
new file mode 100644
index 0000000..d97bfa2
--- /dev/null
+++ b/avatica/src/test/java/org/apache/calcite/avatica/QueryStateTest.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica;
+
+import org.apache.calcite.avatica.remote.MetaDataOperation;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.Statement;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests that {@link QueryState} properly retains the necessary state to 
recreate
+ * a {@link ResultSet}.
+ */
+public class QueryStateTest {
+
+  private Connection conn;
+  private DatabaseMetaData metadata;
+  private Statement statement;
+
+
+  @Before
+  public void setup() throws Exception {
+    conn = Mockito.mock(Connection.class);
+    metadata = Mockito.mock(DatabaseMetaData.class);
+    statement = Mockito.mock(Statement.class);
+
+    Mockito.when(conn.getMetaData()).thenReturn(metadata);
+  }
+
+  @Test
+  public void testMetadataGetAttributes() throws Exception {
+    final String catalog = "catalog";
+    final String schemaPattern = null;
+    final String typeNamePattern = "%";
+    final String attributeNamePattern = "%";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_ATTRIBUTES, 
catalog, schemaPattern,
+        typeNamePattern, attributeNamePattern);
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getAttributes(catalog, schemaPattern, 
typeNamePattern,
+        attributeNamePattern);
+  }
+
+  @Test
+  public void testMetadataGetBestRowIdentifier() throws Exception {
+    final String catalog = "catalog";
+    final String schema = null;
+    final String table = "table";
+    final int scope = 1;
+    final boolean nullable = true;
+
+    QueryState state = new 
QueryState(MetaDataOperation.GET_BEST_ROW_IDENTIFIER, new Object[] {
+      catalog,
+      schema,
+      table,
+      scope,
+      nullable
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getBestRowIdentifier(catalog, schema, table, 
scope, nullable);
+  }
+
+  @Test
+  public void testMetadataGetCatalogs() throws Exception {
+    QueryState state = new QueryState(MetaDataOperation.GET_CATALOGS, new 
Object[0]);
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getCatalogs();
+  }
+
+  @Test
+  public void testMetadataGetColumnPrivileges() throws Exception {
+    final String catalog = null;
+    final String schema = "schema";
+    final String table = "table";
+    final String columnNamePattern = "%";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_COLUMN_PRIVILEGES, 
new Object[] {
+      catalog,
+      schema,
+      table,
+      columnNamePattern
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getColumnPrivileges(catalog, schema, table, 
columnNamePattern);
+  }
+
+  @Test
+  public void testMetadataGetColumns() throws Exception {
+    final String catalog = null;
+    final String schemaPattern = "%";
+    final String tableNamePattern = "%";
+    final String columnNamePattern = "%";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_COLUMNS, new 
Object[] {
+      catalog,
+      schemaPattern,
+      tableNamePattern,
+      columnNamePattern
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getColumns(catalog, schemaPattern, 
tableNamePattern,
+        columnNamePattern);
+  }
+
+  @Test
+  public void testMetadataGetCrossReference() throws Exception {
+    final String parentCatalog = null;
+    final String parentSchema = null;
+    final String parentTable = "%";
+    final String foreignCatalog = null;
+    final String foreignSchema = null;
+    final String foreignTable = "%";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_CROSS_REFERENCE, 
new Object[] {
+      parentCatalog,
+      parentSchema,
+      parentTable,
+      foreignCatalog,
+      foreignSchema,
+      foreignTable
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getCrossReference(parentCatalog, parentSchema, 
parentTable,
+        foreignCatalog, foreignSchema, foreignTable);
+  }
+
+  @Test
+  public void testMetadataGetExportedKeys() throws Exception {
+    final String catalog = "";
+    final String schema = null;
+    final String table = "mytable";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_EXPORTED_KEYS, new 
Object[] {
+      catalog,
+      schema,
+      table
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getExportedKeys(catalog, schema, table);
+  }
+
+  @Test
+  public void testMetadataGetFunctionColumns() throws Exception {
+    final String catalog = null;
+    final String schemaPattern = "%";
+    final String functionNamePattern = "%";
+    final String columnNamePattern = "%";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_FUNCTION_COLUMNS, 
new Object[] {
+      catalog,
+      schemaPattern,
+      functionNamePattern,
+      columnNamePattern
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getFunctionColumns(catalog, schemaPattern, 
functionNamePattern,
+        columnNamePattern);
+  }
+
+  @Test
+  public void testMetadataGetFunctions() throws Exception {
+    final String catalog = null;
+    final String schemaPattern = "%";
+    final String functionNamePattern = "%";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_FUNCTIONS, new 
Object[] {
+      catalog,
+      schemaPattern,
+      functionNamePattern
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getFunctions(catalog, schemaPattern, 
functionNamePattern);
+  }
+
+  @Test
+  public void testMetadataGetImportedKeys() throws Exception {
+    final String catalog = "";
+    final String schema = null;
+    final String table = "my_table";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_IMPORTED_KEYS, new 
Object[] {
+      catalog,
+      schema,
+      table
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getImportedKeys(catalog, schema, table);
+  }
+
+  @Test
+  public void testMetadataGetIndexInfo() throws Exception {
+    final String catalog = "";
+    final String schema = null;
+    final String table = "my_table";
+    final boolean unique = true;
+    final boolean approximate = true;
+
+    QueryState state = new QueryState(MetaDataOperation.GET_INDEX_INFO, new 
Object[] {
+      catalog,
+      schema,
+      table,
+      unique,
+      approximate
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getIndexInfo(catalog, schema, table, unique, 
approximate);
+  }
+
+  @Test
+  public void testMetadataGetPrimaryKeys() throws Exception {
+    final String catalog = "";
+    final String schema = null;
+    final String table = "my_table";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_PRIMARY_KEYS, new 
Object[] {
+      catalog,
+      schema,
+      table
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getPrimaryKeys(catalog, schema, table);
+  }
+
+  @Test
+  public void testMetadataGetProcedureColumns() throws Exception {
+    final String catalog = "";
+    final String schemaPattern = null;
+    final String procedureNamePattern = "%";
+    final String columnNamePattern = "%";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_PROCEDURE_COLUMNS, 
new Object[] {
+      catalog,
+      schemaPattern,
+      procedureNamePattern,
+      columnNamePattern
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getProcedureColumns(catalog, schemaPattern, 
procedureNamePattern,
+        columnNamePattern);
+  }
+
+  @Test
+  public void testMetadataGetProcedures() throws Exception {
+    final String catalog = "";
+    final String schemaPattern = null;
+    final String procedureNamePattern = "%";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_PROCEDURES, new 
Object[] {
+      catalog,
+      schemaPattern,
+      procedureNamePattern,
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getProcedures(catalog, schemaPattern, 
procedureNamePattern);
+  }
+
+  @Test
+  public void testMetadataGetPseudoColumns() throws Exception {
+    final String catalog = "";
+    final String schemaPattern = null;
+    final String tableNamePattern = "%";
+    final String columnNamePattern = "%";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_PSEUDO_COLUMNS, 
new Object[] {
+      catalog,
+      schemaPattern,
+      tableNamePattern,
+      columnNamePattern
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getPseudoColumns(catalog, schemaPattern, 
tableNamePattern,
+        columnNamePattern);
+  }
+
+  @Test
+  public void testMetadataGetSchemas() throws Exception {
+    QueryState state = new QueryState(MetaDataOperation.GET_SCHEMAS, new 
Object[0]);
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getSchemas();
+  }
+
+  @Test
+  public void testMetadataGetSchemasWithArgs() throws Exception {
+    final String catalog = "";
+    final String schemaPattern = null;
+
+    QueryState state = new QueryState(MetaDataOperation.GET_SCHEMAS_WITH_ARGS, 
new Object[] {
+      catalog,
+      schemaPattern
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getSchemas(catalog, schemaPattern);
+  }
+
+  @Test
+  public void testMetadataGetSuperTables() throws Exception {
+    final String catalog = "";
+    final String schemaPattern = null;
+    final String tableNamePattern = "%";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_SUPER_TABLES, new 
Object[] {
+      catalog,
+      schemaPattern,
+      tableNamePattern
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getSuperTables(catalog, schemaPattern, 
tableNamePattern);
+  }
+
+  @Test
+  public void testMetadataGetSuperTypes() throws Exception {
+    final String catalog = "";
+    final String schemaPattern = null;
+    final String tableNamePattern = "%";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_SUPER_TYPES, new 
Object[] {
+      catalog,
+      schemaPattern,
+      tableNamePattern
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getSuperTypes(catalog, schemaPattern, 
tableNamePattern);
+  }
+
+  @Test
+  public void testMetadataGetTablePrivileges() throws Exception {
+    final String catalog = "";
+    final String schemaPattern = null;
+    final String tableNamePattern = "%";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_TABLE_PRIVILEGES, 
new Object[] {
+      catalog,
+      schemaPattern,
+      tableNamePattern
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getTablePrivileges(catalog, schemaPattern, 
tableNamePattern);
+  }
+
+  @Test
+  public void testMetadataGetTables() throws Exception {
+    final String catalog = "";
+    final String schemaPattern = null;
+    final String tableNamePattern = "%";
+    final String[] types = new String[] {"VIEW", "TABLE"};
+
+    QueryState state = new QueryState(MetaDataOperation.GET_TABLES, new 
Object[] {
+      catalog,
+      schemaPattern,
+      tableNamePattern,
+      types
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getTables(catalog, schemaPattern, 
tableNamePattern, types);
+  }
+
+  @Test
+  public void testMetadataGetTableTypes() throws Exception {
+    QueryState state = new QueryState(MetaDataOperation.GET_TABLE_TYPES, new 
Object[0]);
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getTableTypes();
+  }
+
+  @Test
+  public void testMetadataGetTypeInfo() throws Exception {
+    QueryState state = new QueryState(MetaDataOperation.GET_TYPE_INFO, new 
Object[0]);
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getTypeInfo();
+  }
+
+  @Test
+  public void testMetadataGetUDTs() throws Exception {
+    final String catalog = "";
+    final String schemaPattern = null;
+    final String typeNamePattern = "%";
+    final int[] types = new int[] {1, 2};
+
+    QueryState state = new QueryState(MetaDataOperation.GET_UDTS, new Object[] 
{
+      catalog,
+      schemaPattern,
+      typeNamePattern,
+      types
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getUDTs(catalog, schemaPattern, typeNamePattern, 
types);
+  }
+
+  @Test
+  public void testMetadataGetVersionColumns() throws Exception {
+    final String catalog = "";
+    final String schemaPattern = null;
+    final String table = "my_table";
+
+    QueryState state = new QueryState(MetaDataOperation.GET_VERSION_COLUMNS, 
new Object[] {
+      catalog,
+      schemaPattern,
+      table
+    });
+
+    state.invoke(conn, statement);
+
+    Mockito.verify(metadata).getVersionColumns(catalog, schemaPattern, table);
+  }
+
+  @Test
+  public void testSerialization() throws Exception {
+    final String catalog = "catalog";
+    final String schema = null;
+    final String table = "table";
+    final int scope = 1;
+    final boolean nullable = true;
+
+    QueryState state = new 
QueryState(MetaDataOperation.GET_BEST_ROW_IDENTIFIER, new Object[] {
+      catalog,
+      schema,
+      table,
+      scope,
+      nullable
+    });
+
+    assertEquals(state, QueryState.fromProto(state.toProto()));
+
+    final String schemaPattern = null;
+    final String typeNamePattern = "%";
+    final int[] types = new int[] {1, 2};
+
+    state = new QueryState(MetaDataOperation.GET_UDTS, new Object[] {
+      catalog,
+      schemaPattern,
+      typeNamePattern,
+      types
+    });
+
+    assertEquals(state, QueryState.fromProto(state.toProto()));
+
+    state = new QueryState("SELECT * FROM foo");
+
+    assertEquals(state, QueryState.fromProto(state.toProto()));
+  }
+
+}
+
+// End QueryStateTest.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/test/java/org/apache/calcite/avatica/remote/AvaticaHttpClientTest.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/test/java/org/apache/calcite/avatica/remote/AvaticaHttpClientTest.java
 
b/avatica/src/test/java/org/apache/calcite/avatica/remote/AvaticaHttpClientTest.java
new file mode 100644
index 0000000..33369d2
--- /dev/null
+++ 
b/avatica/src/test/java/org/apache/calcite/avatica/remote/AvaticaHttpClientTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+
+import static org.junit.Assert.assertArrayEquals;
+
+/**
+ * Tests for the HTTP transport.
+ */
+public class AvaticaHttpClientTest {
+  private static final String REQUEST =
+      
"{\"request\":\"createStatement\",\"connectionId\":\"8f3f28ee-d0bb-4cdb-a4b1-8f6e8476c534\"}";
+  private static final String RESPONSE =
+      "{\"response\":\"createStatement\",\"connectionId\":"
+          + 
"\"8f3f28ee-d0bb-4cdb-a4b1-8f6e8476c534\",\"statementId\":1608176856}";
+
+  @Test
+  public void testRetryOnUnavailable() throws Exception {
+    // HTTP-503, try again
+    URL url = new URL("http://127.0.0.1:8765";);
+    final HttpURLConnection cnxn = Mockito.mock(HttpURLConnection.class);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    ByteArrayInputStream bais = new 
ByteArrayInputStream(RESPONSE.getBytes(StandardCharsets.UTF_8));
+
+    // Create the HTTP client
+    AvaticaHttpClientImpl client = new AvaticaHttpClientImpl(url) {
+      @Override HttpURLConnection openConnection() throws IOException {
+        return cnxn;
+      }
+    };
+
+    // HTTP 503 then 200
+    
Mockito.when(cnxn.getResponseCode()).thenReturn(HttpURLConnection.HTTP_UNAVAILABLE,
+        HttpURLConnection.HTTP_OK);
+
+    Mockito.when(cnxn.getOutputStream()).thenReturn(baos);
+    Mockito.when(cnxn.getInputStream()).thenReturn(bais);
+
+    byte[] response = client.send(REQUEST.getBytes(StandardCharsets.UTF_8));
+
+    assertArrayEquals(RESPONSE.getBytes(StandardCharsets.UTF_8), response);
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testServerError() throws Exception {
+    // HTTP 500 should error out
+    URL url = new URL("http://127.0.0.1:8765";);
+    final HttpURLConnection cnxn = Mockito.mock(HttpURLConnection.class);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+    // Create the HTTP client
+    AvaticaHttpClientImpl client = new AvaticaHttpClientImpl(url) {
+      @Override HttpURLConnection openConnection() throws IOException {
+        return cnxn;
+      }
+    };
+
+    // HTTP 500
+    
Mockito.when(cnxn.getResponseCode()).thenReturn(HttpURLConnection.HTTP_INTERNAL_ERROR);
+
+    Mockito.when(cnxn.getOutputStream()).thenReturn(baos);
+
+    // Should throw an RTE
+    client.send(REQUEST.getBytes(StandardCharsets.UTF_8));
+  }
+
+}
+
+// End AvaticaHttpClientTest.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/test/java/org/apache/calcite/avatica/remote/MetaDataOperationTest.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/test/java/org/apache/calcite/avatica/remote/MetaDataOperationTest.java
 
b/avatica/src/test/java/org/apache/calcite/avatica/remote/MetaDataOperationTest.java
new file mode 100644
index 0000000..c64b32c
--- /dev/null
+++ 
b/avatica/src/test/java/org/apache/calcite/avatica/remote/MetaDataOperationTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.avatica.remote;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link MetaDataOperation}
+ */
+public class MetaDataOperationTest {
+
+  @Test
+  public void testProtobufSerialization() {
+    for (MetaDataOperation metadataOp : MetaDataOperation.values()) {
+      assertEquals(metadataOp, 
MetaDataOperation.fromProto(metadataOp.toProto()));
+    }
+  }
+
+}
+
+// End MetaDataOperationTest.java

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufHandlerTest.java
 
b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufHandlerTest.java
index f91f597..3c25e13 100644
--- 
a/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufHandlerTest.java
+++ 
b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufHandlerTest.java
@@ -84,7 +84,7 @@ public class ProtobufHandlerTest {
     frameRows.add(new Object[] {true, "my_string"});
 
     Meta.Frame frame = Frame.create(0, true, frameRows);
-    FetchResponse response = new FetchResponse(frame);
+    FetchResponse response = new FetchResponse(frame, false, false);
 
     when(translation.parseRequest(serializedRequest)).thenReturn(request);
     when(service.apply(request)).thenReturn(response);

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java
 
b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java
index 0db48ab..3ca44e9 100644
--- 
a/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java
+++ 
b/avatica/src/test/java/org/apache/calcite/avatica/remote/ProtobufTranslationImplTest.java
@@ -26,6 +26,7 @@ import org.apache.calcite.avatica.Meta.Frame;
 import org.apache.calcite.avatica.Meta.Signature;
 import org.apache.calcite.avatica.Meta.Style;
 import org.apache.calcite.avatica.MetaImpl;
+import org.apache.calcite.avatica.QueryState;
 import org.apache.calcite.avatica.remote.Service.CatalogsRequest;
 import org.apache.calcite.avatica.remote.Service.CloseConnectionRequest;
 import org.apache.calcite.avatica.remote.Service.CloseConnectionResponse;
@@ -51,6 +52,8 @@ import org.apache.calcite.avatica.remote.Service.Request;
 import org.apache.calcite.avatica.remote.Service.Response;
 import org.apache.calcite.avatica.remote.Service.ResultSetResponse;
 import org.apache.calcite.avatica.remote.Service.SchemasRequest;
+import org.apache.calcite.avatica.remote.Service.SyncResultsRequest;
+import org.apache.calcite.avatica.remote.Service.SyncResultsResponse;
 import org.apache.calcite.avatica.remote.Service.TableTypesRequest;
 import org.apache.calcite.avatica.remote.Service.TablesRequest;
 import org.apache.calcite.avatica.remote.Service.TypeInfoRequest;
@@ -198,9 +201,30 @@ public class ProtobufTranslationImplTest<T> {
             new ConnectionPropertiesImpl(Boolean.FALSE, Boolean.FALSE,
                 Integer.MAX_VALUE, "catalog", "schema")));
 
+    requests.add(new SyncResultsRequest("connectionId", 12345, 
getSqlQueryState(), 150));
+    requests.add(new SyncResultsRequest("connectionId2", 54321, 
getMetadataQueryState1(), 0));
+    requests.add(new SyncResultsRequest("connectionId3", 5, 
getMetadataQueryState2(), 10));
+
     return requests;
   }
 
+  private static QueryState getSqlQueryState() {
+    return new QueryState("SELECT * from TABLE");
+  }
+
+  private static QueryState getMetadataQueryState1() {
+    return new QueryState(MetaDataOperation.GET_COLUMNS, new Object[] {
+      "",
+      null,
+      "%",
+      "%"
+    });
+  }
+
+  private static QueryState getMetadataQueryState2() {
+    return new QueryState(MetaDataOperation.GET_CATALOGS, new Object[0]);
+  }
+
   private static List<Request> getRequestsWithNulls() {
     LinkedList<Request> requests = new LinkedList<>();
 
@@ -269,8 +293,10 @@ public class ProtobufTranslationImplTest<T> {
     }
     responses.add(new DatabasePropertyResponse(propertyMap));
 
-    responses.add(new ExecuteResponse(Arrays.asList(results1, results1, 
results1)));
-    responses.add(new FetchResponse(frame));
+    responses.add(new ExecuteResponse(Arrays.asList(results1, results1, 
results1), false));
+    responses.add(new FetchResponse(frame, false, false));
+    responses.add(new FetchResponse(frame, true, true));
+    responses.add(new FetchResponse(frame, false, true));
     responses.add(
         new PrepareResponse(
             new Meta.StatementHandle("connectionId", Integer.MAX_VALUE,
@@ -283,6 +309,13 @@ public class ProtobufTranslationImplTest<T> {
             ErrorResponse.UNKNOWN_ERROR_CODE, ErrorResponse.UNKNOWN_SQL_STATE,
             AvaticaSeverity.WARNING));
 
+    // No more results, statement not missing
+    responses.add(new SyncResultsResponse(false, false));
+    // Missing statement, no results
+    responses.add(new SyncResultsResponse(false, true));
+    // More results, no missing statement
+    responses.add(new SyncResultsResponse(true, false));
+
     return responses;
   }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/avatica/src/test/java/org/apache/calcite/avatica/test/JsonHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/avatica/src/test/java/org/apache/calcite/avatica/test/JsonHandlerTest.java 
b/avatica/src/test/java/org/apache/calcite/avatica/test/JsonHandlerTest.java
index 18d66e9..dccf889 100644
--- a/avatica/src/test/java/org/apache/calcite/avatica/test/JsonHandlerTest.java
+++ b/avatica/src/test/java/org/apache/calcite/avatica/test/JsonHandlerTest.java
@@ -111,6 +111,10 @@ public class JsonHandlerTest {
       return null;
     }
 
+    @Override public SyncResultsResponse apply(SyncResultsRequest request) {
+      return null;
+    }
+
     @Override public ExecuteResponse apply(ExecuteRequest request) {
       return null;
     }
@@ -145,7 +149,7 @@ public class JsonHandlerTest {
               RANDOM.nextInt(), false, signature, Meta.Frame.EMPTY, -1L);
 
       return new Service.ExecuteResponse(
-          Collections.singletonList(resultSetResponse));
+          Collections.singletonList(resultSetResponse), false);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/core/src/main/java/org/apache/calcite/jdbc/CalciteJdbc41Factory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/calcite/jdbc/CalciteJdbc41Factory.java 
b/core/src/main/java/org/apache/calcite/jdbc/CalciteJdbc41Factory.java
index f44e871..5f677c1 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteJdbc41Factory.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteJdbc41Factory.java
@@ -24,6 +24,7 @@ import org.apache.calcite.avatica.AvaticaPreparedStatement;
 import org.apache.calcite.avatica.AvaticaResultSetMetaData;
 import org.apache.calcite.avatica.AvaticaStatement;
 import org.apache.calcite.avatica.Meta;
+import org.apache.calcite.avatica.QueryState;
 import org.apache.calcite.avatica.UnregisteredDriver;
 
 import java.io.InputStream;
@@ -90,7 +91,7 @@ public class CalciteJdbc41Factory extends CalciteFactory {
         resultSetConcurrency, resultSetHoldability);
   }
 
-  public CalciteResultSet newResultSet(AvaticaStatement statement,
+  public CalciteResultSet newResultSet(AvaticaStatement statement, QueryState 
state,
       Meta.Signature signature, TimeZone timeZone, Meta.Frame firstFrame) {
     final ResultSetMetaData metaData =
         newResultSetMetaData(statement, signature);

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java 
b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
index 73c0880..e4f0ae5 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteMetaImpl.java
@@ -25,6 +25,8 @@ import org.apache.calcite.avatica.AvaticaUtils;
 import org.apache.calcite.avatica.ColumnMetaData;
 import org.apache.calcite.avatica.Meta;
 import org.apache.calcite.avatica.MetaImpl;
+import org.apache.calcite.avatica.NoSuchStatementException;
+import org.apache.calcite.avatica.QueryState;
 import org.apache.calcite.avatica.remote.TypedValue;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.linq4j.Enumerator;
@@ -528,7 +530,13 @@ public class CalciteMetaImpl extends MetaImpl {
         "TABLE_TYPE");
   }
 
-  @Override public Iterable<Object> createIterable(StatementHandle handle,
+  @Override public Iterable<Object> createIterable(StatementHandle handle, 
QueryState state,
+      Signature signature, List<TypedValue> parameterValues, Frame firstFrame) 
{
+    // Drop QueryState
+    return _createIterable(handle, signature, parameterValues, firstFrame);
+  }
+
+  Iterable<Object> _createIterable(StatementHandle handle,
       Signature signature, List<TypedValue> parameterValues, Frame firstFrame) 
{
     try {
       //noinspection unchecked
@@ -584,7 +592,7 @@ public class CalciteMetaImpl extends MetaImpl {
     final Iterator<Object> iterator;
     if (stmt.getResultSet() == null) {
       final Iterable<Object> iterable =
-          createIterable(h, signature, null, null);
+          _createIterable(h, signature, null, null);
       iterator = iterable.iterator();
       stmt.setResultSet(iterator);
     } else {
@@ -606,7 +614,7 @@ public class CalciteMetaImpl extends MetaImpl {
     final Iterator<Object> iterator;
 
     final Iterable<Object> iterable =
-        createIterable(h, signature, parameterValues, null);
+        _createIterable(h, signature, parameterValues, null);
     iterator = iterable.iterator();
     stmt.setResultSet(iterator);
 
@@ -644,6 +652,12 @@ public class CalciteMetaImpl extends MetaImpl {
     return DRIVER.connect(schema, typeFactory);
   }
 
+  public boolean syncResults(StatementHandle h, QueryState state, long offset)
+      throws NoSuchStatementException {
+    // Doesn't have application in Calcite itself.
+    throw new UnsupportedOperationException();
+  }
+
   /** Metadata describing a Calcite table. */
   private static class CalciteMetaTable extends MetaTable {
     private final Table calciteTable;

http://git-wip-us.apache.org/repos/asf/calcite/blob/97df1acb/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java 
b/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java
index be68916..c45b245 100644
--- a/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java
+++ b/core/src/main/java/org/apache/calcite/jdbc/CalciteResultSet.java
@@ -46,7 +46,7 @@ public class CalciteResultSet extends AvaticaResultSet {
       CalcitePrepare.CalciteSignature calciteSignature,
       ResultSetMetaData resultSetMetaData, TimeZone timeZone,
       Meta.Frame firstFrame) {
-    super(statement, calciteSignature, resultSetMetaData, timeZone, 
firstFrame);
+    super(statement, null, calciteSignature, resultSetMetaData, timeZone, 
firstFrame);
   }
 
   @Override protected CalciteResultSet execute() throws SQLException {

Reply via email to