http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java ---------------------------------------------------------------------- diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java b/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java new file mode 100644 index 0000000..ff27d05 --- /dev/null +++ b/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.jdbc; + +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.SQLFeatureNotSupportedException; +import java.sql.Statement; +import java.util.Objects; + +/** + * All we know about a statement. Encapsulates a {@link ResultSet}. + */ +public class StatementInfo { + private volatile Boolean relativeSupported = null; + + final Statement statement; // sometimes a PreparedStatement + private ResultSet resultSet; + private long position = 0; + + // True when setResultSet(ResultSet) is called to let us determine the difference between + // a null ResultSet (from an update) from the lack of a ResultSet. + private boolean resultsInitialized = false; + + public StatementInfo(Statement statement) { + this.statement = Objects.requireNonNull(statement); + } + + // Visible for testing + void setPosition(long position) { + this.position = position; + } + + // Visible for testing + long getPosition() { + return this.position; + } + + /** + * Set a ResultSet on this object. + * + * @param resultSet The current ResultSet + */ + public void setResultSet(ResultSet resultSet) { + resultsInitialized = true; + this.resultSet = resultSet; + } + + /** + * @return The {@link ResultSet} for this Statement, may be null. + */ + public ResultSet getResultSet() { + return this.resultSet; + } + + /** + * @return True if {@link #setResultSet(ResultSet)} was ever invoked. + */ + public boolean isResultSetInitialized() { + return resultsInitialized; + } + + /** + * @see ResultSet#next() + */ + public boolean next() throws SQLException { + return _next(resultSet); + } + + boolean _next(ResultSet results) throws SQLException { + boolean ret = results.next(); + position++; + return ret; + } + + /** + * Consumes <code>offset - position</code> elements from the {@link ResultSet}. + * + * @param offset The offset to advance to + * @return True if the resultSet was advanced to the current point, false if insufficient rows + * were present to advance to the requested offset. + */ + public boolean advanceResultSetToOffset(ResultSet results, long offset) throws SQLException { + if (offset < 0 || offset < position) { + throw new IllegalArgumentException("Offset should be " + + " non-negative and not less than the current position. " + offset + ", " + position); + } + if (position >= offset) { + return true; + } + + if (null == relativeSupported) { + Boolean moreResults = null; + synchronized (this) { + if (null == relativeSupported) { + try { + moreResults = advanceByRelative(results, offset); + relativeSupported = true; + } catch (SQLFeatureNotSupportedException e) { + relativeSupported = false; + } + } + } + + if (null != moreResults) { + // We figured out whether or not relative is supported. + // Make sure we actually do the necessary work. + if (!relativeSupported) { + // We avoided calling advanceByNext in the synchronized block earlier. + moreResults = advanceByNext(results, offset); + } + + return moreResults; + } + + // Another thread updated the RELATIVE_SUPPORTED before we did, fall through. + } + + if (relativeSupported) { + return advanceByRelative(results, offset); + } else { + return advanceByNext(results, offset); + } + } + + private boolean advanceByRelative(ResultSet results, long offset) throws SQLException { + long diff = offset - position; + while (diff > Integer.MAX_VALUE) { + if (!results.relative(Integer.MAX_VALUE)) { + // Avoid updating position until relative succeeds. + position += Integer.MAX_VALUE; + return false; + } + // Avoid updating position until relative succeeds. + position += Integer.MAX_VALUE; + diff -= Integer.MAX_VALUE; + } + boolean ret = results.relative((int) diff); + // Make sure we only update the position after successfully calling relative(int). + position += diff; + return ret; + } + + private boolean advanceByNext(ResultSet results, long offset) throws SQLException { + while (position < offset) { + // Advance while maintaining `position` + if (!_next(results)) { + return false; + } + } + + return true; + } +} + +// End StatementInfo.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java ---------------------------------------------------------------------- diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java b/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java new file mode 100644 index 0000000..8b8fb76 --- /dev/null +++ b/avatica/server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Implements an Avatica provider on top of an existing JDBC data source. */ +package org.apache.calcite.avatica.jdbc; + + +// End package-info.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java ---------------------------------------------------------------------- diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java new file mode 100644 index 0000000..42b13c9 --- /dev/null +++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.server; + +import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; + +import org.eclipse.jetty.server.Handler; + +/** + * A custom interface that extends the Jetty interface to enable extra control within Avatica. + */ +public interface AvaticaHandler extends Handler { + + void setServerRpcMetadata(RpcMetadataResponse metadata); + +} + +// End AvaticaHandler.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java ---------------------------------------------------------------------- diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java new file mode 100644 index 0000000..34a9333 --- /dev/null +++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.server; + +import org.apache.calcite.avatica.AvaticaUtils; +import org.apache.calcite.avatica.metrics.MetricsSystem; +import org.apache.calcite.avatica.metrics.Timer; +import org.apache.calcite.avatica.metrics.Timer.Context; +import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem; +import org.apache.calcite.avatica.remote.Handler.HandlerResponse; +import org.apache.calcite.avatica.remote.JsonHandler; +import org.apache.calcite.avatica.remote.Service; +import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; +import org.apache.calcite.avatica.util.UnsynchronizedBuffer; + +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.calcite.avatica.remote.MetricsHelper.concat; + +import java.io.IOException; +import java.util.Objects; + +import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * Jetty handler that executes Avatica JSON request-responses. + */ +public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareAvaticaHandler { + private static final Logger LOG = LoggerFactory.getLogger(AvaticaJsonHandler.class); + + final Service service; + final JsonHandler jsonHandler; + + final MetricsSystem metrics; + final Timer requestTimer; + + final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer; + + public AvaticaJsonHandler(Service service) { + this(service, NoopMetricsSystem.getInstance()); + } + + public AvaticaJsonHandler(Service service, MetricsSystem metrics) { + this.service = Objects.requireNonNull(service); + this.metrics = Objects.requireNonNull(metrics); + // Avatica doesn't have a Guava dependency + this.jsonHandler = new JsonHandler(service, this.metrics); + + // Metrics + this.requestTimer = this.metrics.getTimer( + concat(AvaticaJsonHandler.class, MetricsAwareAvaticaHandler.REQUEST_TIMER_NAME)); + + this.threadLocalBuffer = new ThreadLocal<UnsynchronizedBuffer>() { + @Override public UnsynchronizedBuffer initialValue() { + return new UnsynchronizedBuffer(); + } + }; + } + + public void handle(String target, Request baseRequest, + HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + try (final Context ctx = requestTimer.start()) { + response.setContentType("application/json;charset=utf-8"); + response.setStatus(HttpServletResponse.SC_OK); + if (request.getMethod().equals("POST")) { + // First look for a request in the header, then look in the body. + // The latter allows very large requests without hitting HTTP 413. + String rawRequest = request.getHeader("request"); + if (rawRequest == null) { + // Avoid a new buffer creation for every HTTP request + final UnsynchronizedBuffer buffer = threadLocalBuffer.get(); + try (ServletInputStream inputStream = request.getInputStream()) { + rawRequest = AvaticaUtils.readFully(inputStream, buffer); + } finally { + // Reset the offset into the buffer after we're done + buffer.reset(); + } + } + final String jsonRequest = + new String(rawRequest.getBytes("ISO-8859-1"), "UTF-8"); + LOG.trace("request: {}", jsonRequest); + + final HandlerResponse<String> jsonResponse = jsonHandler.apply(jsonRequest); + LOG.trace("response: {}", jsonResponse); + baseRequest.setHandled(true); + // Set the status code and write out the response. + response.setStatus(jsonResponse.getStatusCode()); + response.getWriter().println(jsonResponse.getResponse()); + } + } + } + + @Override public void setServerRpcMetadata(RpcMetadataResponse metadata) { + // Set the metadata for the normal service calls + service.setRpcMetadata(metadata); + // Also add it to the handler to include with exceptions + jsonHandler.setRpcMetadata(metadata); + } + + @Override public MetricsSystem getMetrics() { + return metrics; + } +} + +// End AvaticaJsonHandler.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java ---------------------------------------------------------------------- diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java new file mode 100644 index 0000000..27e73de --- /dev/null +++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.server; + +import org.apache.calcite.avatica.AvaticaUtils; +import org.apache.calcite.avatica.metrics.MetricsSystem; +import org.apache.calcite.avatica.metrics.Timer; +import org.apache.calcite.avatica.metrics.Timer.Context; +import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem; +import org.apache.calcite.avatica.remote.Handler.HandlerResponse; +import org.apache.calcite.avatica.remote.MetricsHelper; +import org.apache.calcite.avatica.remote.ProtobufHandler; +import org.apache.calcite.avatica.remote.ProtobufTranslation; +import org.apache.calcite.avatica.remote.ProtobufTranslationImpl; +import org.apache.calcite.avatica.remote.Service; +import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; +import org.apache.calcite.avatica.util.UnsynchronizedBuffer; + +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Objects; + +import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * Jetty handler that executes Avatica JSON request-responses. + */ +public class AvaticaProtobufHandler extends AbstractHandler implements MetricsAwareAvaticaHandler { + private static final Logger LOG = LoggerFactory.getLogger(AvaticaJsonHandler.class); + + private final Service service; + private final ProtobufHandler pbHandler; + private final ProtobufTranslation protobufTranslation; + private final MetricsSystem metrics; + private final Timer requestTimer; + + final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer; + + public AvaticaProtobufHandler(Service service) { + this(service, NoopMetricsSystem.getInstance()); + } + + public AvaticaProtobufHandler(Service service, MetricsSystem metrics) { + this.service = Objects.requireNonNull(service); + this.metrics = Objects.requireNonNull(metrics); + + this.requestTimer = this.metrics.getTimer( + MetricsHelper.concat(AvaticaProtobufHandler.class, + MetricsAwareAvaticaHandler.REQUEST_TIMER_NAME)); + + this.protobufTranslation = new ProtobufTranslationImpl(); + this.pbHandler = new ProtobufHandler(service, protobufTranslation, metrics); + + this.threadLocalBuffer = new ThreadLocal<UnsynchronizedBuffer>() { + @Override public UnsynchronizedBuffer initialValue() { + return new UnsynchronizedBuffer(); + } + }; + } + + public void handle(String target, Request baseRequest, + HttpServletRequest request, HttpServletResponse response) + throws IOException, ServletException { + try (final Context ctx = this.requestTimer.start()) { + response.setContentType("application/octet-stream;charset=utf-8"); + response.setStatus(HttpServletResponse.SC_OK); + if (request.getMethod().equals("POST")) { + byte[] requestBytes; + // Avoid a new buffer creation for every HTTP request + final UnsynchronizedBuffer buffer = threadLocalBuffer.get(); + try (ServletInputStream inputStream = request.getInputStream()) { + requestBytes = AvaticaUtils.readFullyToBytes(inputStream, buffer); + } finally { + buffer.reset(); + } + + HandlerResponse<byte[]> handlerResponse = pbHandler.apply(requestBytes); + + baseRequest.setHandled(true); + response.setStatus(handlerResponse.getStatusCode()); + response.getOutputStream().write(handlerResponse.getResponse()); + } + } + } + + @Override public void setServerRpcMetadata(RpcMetadataResponse metadata) { + // Set the metadata for the normal service calls + service.setRpcMetadata(metadata); + // Also add it to the handler to include with exceptions + pbHandler.setRpcMetadata(metadata); + } + + @Override public MetricsSystem getMetrics() { + return this.metrics; + } + +} + +// End AvaticaProtobufHandler.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java ---------------------------------------------------------------------- diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java new file mode 100644 index 0000000..a574985 --- /dev/null +++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.server; + +import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; + +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Objects; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +/** + * An AvaticaHandler implementation that delegates to a provided Jetty Handler instance. + * + * <p>This implementation provides a no-op implementation for + * {@link #setServerRpcMetadata(org.apache.calcite.avatica.remote.Service.RpcMetadataResponse)}. + * + * Does not implement {@link MetricsAwareAvaticaHandler} as this implementation is only presented + * for backwards compatibility. + */ +public class DelegatingAvaticaHandler implements AvaticaHandler { + private static final Logger LOG = LoggerFactory.getLogger(DelegatingAvaticaHandler.class); + + private final Handler handler; + + public DelegatingAvaticaHandler(Handler handler) { + this.handler = Objects.requireNonNull(handler); + } + + @Override public void handle(String target, Request baseRequest, HttpServletRequest request, + HttpServletResponse response) throws IOException, ServletException { + handler.handle(target, baseRequest, request, response); + } + + @Override public void setServer(Server server) { + handler.setServer(server); + } + + @Override public Server getServer() { + return handler.getServer(); + } + + @Override public void destroy() { + handler.destroy(); + } + + @Override public void start() throws Exception { + handler.start(); + } + + @Override public void stop() throws Exception { + handler.stop(); + } + + @Override public boolean isRunning() { + return handler.isRunning(); + } + + @Override public boolean isStarted() { + return handler.isStarted(); + } + + @Override public boolean isStarting() { + return handler.isStarting(); + } + + @Override public boolean isStopping() { + return handler.isStopping(); + } + + @Override public boolean isStopped() { + return handler.isStopped(); + } + + @Override public boolean isFailed() { + return handler.isFailed(); + } + + @Override public void addLifeCycleListener(Listener listener) { + handler.addLifeCycleListener(listener); + } + + @Override public void removeLifeCycleListener(Listener listener) { + handler.removeLifeCycleListener(listener); + } + + @Override public void setServerRpcMetadata(RpcMetadataResponse metadata) { + LOG.warn("Setting RpcMetadata is not implemented for DelegatingAvaticaHandler"); + } + +} + +// End DelegatingAvaticaHandler.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java ---------------------------------------------------------------------- diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java new file mode 100644 index 0000000..b1fcb40 --- /dev/null +++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.server; + +import org.apache.calcite.avatica.metrics.MetricsSystem; +import org.apache.calcite.avatica.metrics.MetricsSystemConfiguration; +import org.apache.calcite.avatica.metrics.MetricsSystemFactory; +import org.apache.calcite.avatica.metrics.MetricsSystemLoader; +import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem; +import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystemConfiguration; +import org.apache.calcite.avatica.remote.Driver; +import org.apache.calcite.avatica.remote.Service; + +import org.eclipse.jetty.server.Handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.ServiceLoader; + +/** + * Factory that instantiates the desired implementation, typically differing on the method + * used to serialize messages, for use in the Avatica server. + */ +public class HandlerFactory { + private static final Logger LOG = LoggerFactory.getLogger(HandlerFactory.class); + + /** + * Constructs the desired implementation for the given serialization method with metrics. + * + * @param service The underlying {@link Service}. + * @param serialization The desired message serialization. + * @return The {@link Handler}. + */ + public Handler getHandler(Service service, Driver.Serialization serialization) { + return getHandler(service, serialization, NoopMetricsSystemConfiguration.getInstance()); + } + + /** + * Constructs the desired implementation for the given serialization method with metrics. + * + * @param service The underlying {@link Service}. + * @param serialization The desired message serialization. + * @param metricsConfig Configuration for the {@link MetricsSystem}. + * @return The {@link Handler}. + */ + public Handler getHandler(Service service, Driver.Serialization serialization, + MetricsSystemConfiguration<?> metricsConfig) { + MetricsSystem metrics = MetricsSystemLoader.load(Objects.requireNonNull(metricsConfig)); + + switch (serialization) { + case JSON: + return new AvaticaJsonHandler(service, metrics); + case PROTOBUF: + return new AvaticaProtobufHandler(service, metrics); + default: + throw new IllegalArgumentException("Unknown Avatica handler for " + serialization.name()); + } + } + + /** + * Load a {@link MetricsSystem} using ServiceLoader to create a {@link MetricsSystemFactory}. + * + * @param config State to pass to the factory for initialization. + * @return A {@link MetricsSystem} instance. + */ + MetricsSystem loadMetricsSystem(MetricsSystemConfiguration<?> config) { + ServiceLoader<MetricsSystemFactory> loader = ServiceLoader.load(MetricsSystemFactory.class); + List<MetricsSystemFactory> availableFactories = new ArrayList<>(); + for (MetricsSystemFactory factory : loader) { + availableFactories.add(factory); + } + + if (1 == availableFactories.size()) { + // One and only one instance -- what we want + MetricsSystemFactory factory = availableFactories.get(0); + LOG.info("Loaded MetricsSystem {}", factory.getClass()); + return factory.create(config); + } else if (availableFactories.isEmpty()) { + // None-provided default to no metrics + LOG.info("No metrics implementation available on classpath. Using No-op implementation"); + return NoopMetricsSystem.getInstance(); + } else { + // Tell the user they're doing something wrong, and choose the first impl. + StringBuilder sb = new StringBuilder(); + for (MetricsSystemFactory factory : availableFactories) { + if (sb.length() > 0) { + sb.append(", "); + } + sb.append(factory.getClass()); + } + LOG.warn("Found multiple MetricsSystemFactory implementations: {}." + + " Using No-op implementation", sb); + return NoopMetricsSystem.getInstance(); + } + } +} + +// End HandlerFactory.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java ---------------------------------------------------------------------- diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java new file mode 100644 index 0000000..c81e899 --- /dev/null +++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.server; + +import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; + +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.handler.HandlerList; +import org.eclipse.jetty.util.thread.QueuedThreadPool; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +/** + * Avatica HTTP server. + * + * <p>If you need to change the server's configuration, override the + * {@link #configureConnector(ServerConnector, int)} method in a derived class. + */ +public class HttpServer { + private static final Logger LOG = LoggerFactory.getLogger(HttpServer.class); + + private Server server; + private int port = -1; + private final AvaticaHandler handler; + + @Deprecated + public HttpServer(Handler handler) { + this(wrapJettyHandler(handler)); + } + + public HttpServer(AvaticaHandler handler) { + this(0, handler); + } + + @Deprecated + public HttpServer(int port, Handler handler) { + this(port, wrapJettyHandler(handler)); + } + + public HttpServer(int port, AvaticaHandler handler) { + this.port = port; + this.handler = handler; + } + + private static AvaticaHandler wrapJettyHandler(Handler handler) { + if (handler instanceof AvaticaHandler) { + return (AvaticaHandler) handler; + } + // Backwards compatibility, noop's the AvaticaHandler interface + return new DelegatingAvaticaHandler(handler); + } + + public void start() { + if (server != null) { + throw new RuntimeException("Server is already started"); + } + + final QueuedThreadPool threadPool = new QueuedThreadPool(); + threadPool.setDaemon(true); + server = new Server(threadPool); + server.manage(threadPool); + + final ServerConnector connector = configureConnector(new ServerConnector(server), port); + + server.setConnectors(new Connector[] { connector }); + + final HandlerList handlerList = new HandlerList(); + handlerList.setHandlers(new Handler[] { handler, new DefaultHandler() }); + server.setHandler(handlerList); + try { + server.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } + port = connector.getLocalPort(); + + LOG.info("Service listening on port {}.", getPort()); + + // Set the information about the address for this server + try { + this.handler.setServerRpcMetadata(createRpcServerMetadata(connector)); + } catch (UnknownHostException e) { + // Failed to do the DNS lookup, bail out. + throw new RuntimeException(e); + } + } + + private RpcMetadataResponse createRpcServerMetadata(ServerConnector connector) throws + UnknownHostException { + String host = connector.getHost(); + if (null == host) { + // "null" means binding to all interfaces, we need to pick one so the client gets a real + // address and not "0.0.0.0" or similar. + host = InetAddress.getLocalHost().getHostName(); + } + + final int port = connector.getLocalPort(); + + return new RpcMetadataResponse(String.format("%s:%d", host, port)); + } + + /** + * Configures the server connector. + * + * <p>The default configuration sets a timeout of 1 minute and disables + * TCP linger time. + * + * <p>To change the configuration, override this method in a derived class. + * The overriding method must call its super method. + * + * @param connector connector to be configured + * @param port port number handed over in constructor + */ + protected ServerConnector configureConnector(ServerConnector connector, int port) { + connector.setIdleTimeout(60 * 1000); + connector.setSoLingerTime(-1); + connector.setPort(port); + return connector; + } + + public void stop() { + if (server == null) { + throw new RuntimeException("Server is already stopped"); + } + + LOG.info("Service terminating."); + try { + final Server server1 = server; + port = -1; + server = null; + server1.stop(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void join() throws InterruptedException { + server.join(); + } + + public int getPort() { + return port; + } +} + +// End HttpServer.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/Main.java ---------------------------------------------------------------------- diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/Main.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/Main.java new file mode 100644 index 0000000..8b05931 --- /dev/null +++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/Main.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.server; + +import org.apache.calcite.avatica.Meta; +import org.apache.calcite.avatica.remote.LocalService; +import org.apache.calcite.avatica.remote.Service; + +import org.eclipse.jetty.server.handler.AbstractHandler; + +import java.util.Arrays; + +/** + * Jetty handler that executes Avatica JSON request-responses. + */ +public class Main { + private Main() {} + + public static void main(String[] args) + throws InterruptedException, ClassNotFoundException, + IllegalAccessException, InstantiationException { + HttpServer server = start(args); + server.join(); + } + + /** + * Factory that instantiates Jetty Handlers + */ + public interface HandlerFactory { + AbstractHandler createHandler(Service service); + } + + private static final HandlerFactory JSON_HANDLER_FACTORY = new HandlerFactory() { + public AbstractHandler createHandler(Service service) { + return new AvaticaJsonHandler(service); + } + }; + + /** + * Creates and starts an {@link HttpServer} using JSON POJO serialization of requests/responses. + * + * <p>Arguments are as follows: + * <ul> + * <li>args[0]: the {@link org.apache.calcite.avatica.Meta.Factory} class + * name + * <li>args[1+]: arguments passed along to + * {@link org.apache.calcite.avatica.Meta.Factory#create(java.util.List)} + * </ul> + * + * @param args Command-line arguments + */ + public static HttpServer start(String[] args) throws ClassNotFoundException, + InstantiationException, IllegalAccessException { + return start(args, 8765, JSON_HANDLER_FACTORY); + } + + /** + * Creates and starts an {@link HttpServer} using the given factory to create the Handler. + * + * <p>Arguments are as follows: + * <ul> + * <li>args[0]: the {@link org.apache.calcite.avatica.Meta.Factory} class + * name + * <li>args[1+]: arguments passed along to + * {@link org.apache.calcite.avatica.Meta.Factory#create(java.util.List)} + * </ul> + * + * @param args Command-line arguments + * @param port Server port to bind + * @param handlerFactory Factory to create the handler used by the server + */ + public static HttpServer start(String[] args, int port, HandlerFactory handlerFactory) + throws ClassNotFoundException, InstantiationException, + IllegalAccessException { + String factoryClassName = args[0]; + Class<?> factoryClass = Class.forName(factoryClassName); + Meta.Factory factory = (Meta.Factory) factoryClass.newInstance(); + Meta meta = factory.create(Arrays.asList(args).subList(1, args.length)); + Service service = new LocalService(meta); + HttpServer server = new HttpServer(port, handlerFactory.createHandler(service)); + server.start(); + return server; + } +} + +// End Main.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java ---------------------------------------------------------------------- diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java new file mode 100644 index 0000000..0914dbd --- /dev/null +++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica.server; + +import org.apache.calcite.avatica.metrics.MetricsSystem; + +/** + * An {@link AvaticaHandler} that is capable of collecting metrics. + */ +public interface MetricsAwareAvaticaHandler extends AvaticaHandler { + + /** + * General prefix for all metrics in a handler. + */ + String HANDLER_PREFIX = "Handler."; + + /** + * Name for timing requests from users + */ + String REQUEST_TIMER_NAME = HANDLER_PREFIX + "RequestTimings"; + + /** + * @return An instance of the {@link MetricsSystem} for this AvaticaHandler. + */ + MetricsSystem getMetrics(); + +} + +// End MetricsAwareAvaticaHandler.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/main/java/org/apache/calcite/avatica/server/package-info.java ---------------------------------------------------------------------- diff --git a/avatica/server/src/main/java/org/apache/calcite/avatica/server/package-info.java b/avatica/server/src/main/java/org/apache/calcite/avatica/server/package-info.java new file mode 100644 index 0000000..f2b8728 --- /dev/null +++ b/avatica/server/src/main/java/org/apache/calcite/avatica/server/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Avatica server that listens for HTTP requests. + */ +@PackageMarker +package org.apache.calcite.avatica.server; + +import org.apache.calcite.avatica.util.PackageMarker; + +// End package-info.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/test/java/org/apache/calcite/avatica/ConnectionSpec.java ---------------------------------------------------------------------- diff --git a/avatica/server/src/test/java/org/apache/calcite/avatica/ConnectionSpec.java b/avatica/server/src/test/java/org/apache/calcite/avatica/ConnectionSpec.java new file mode 100644 index 0000000..ba4c5b8 --- /dev/null +++ b/avatica/server/src/test/java/org/apache/calcite/avatica/ConnectionSpec.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica; + +import net.hydromatic.scott.data.hsqldb.ScottHsqldb; + +import java.util.concurrent.locks.ReentrantLock; + +/** Information necessary to create a JDBC connection. Specify one to run + * tests against a different database. (hsqldb is the default.) */ +public class ConnectionSpec { + public final String url; + public final String username; + public final String password; + public final String driver; + + // CALCITE-687 HSQLDB seems to fail oddly when multiple tests are run concurrently + private static final ReentrantLock HSQLDB_LOCK = new ReentrantLock(); + + public ConnectionSpec(String url, String username, String password, + String driver) { + this.url = url; + this.username = username; + this.password = password; + this.driver = driver; + } + + public static final ConnectionSpec HSQLDB = + new ConnectionSpec(ScottHsqldb.URI, ScottHsqldb.USER, + ScottHsqldb.PASSWORD, "org.hsqldb.jdbcDriver"); + + /** + * Return a lock used for controlling concurrent access to the database as it has been observed + * that concurrent access is causing problems with HSQLDB. + */ + public static ReentrantLock getDatabaseLock() { + return HSQLDB_LOCK; + } +} + +// End ConnectionSpec.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica/server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java ---------------------------------------------------------------------- diff --git a/avatica/server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java b/avatica/server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java new file mode 100644 index 0000000..9749ef6 --- /dev/null +++ b/avatica/server/src/test/java/org/apache/calcite/avatica/RemoteDriverMockTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.calcite.avatica; + +import org.apache.calcite.avatica.remote.MockJsonService; +import org.apache.calcite.avatica.remote.MockProtobufService.MockProtobufServiceFactory; + +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ParameterMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * RemoteDriver tests that use a Mock implementation of a Connection. + */ +@RunWith(Parameterized.class) +public class RemoteDriverMockTest { + public static final String MJS = MockJsonService.Factory.class.getName(); + public static final String MPBS = MockProtobufServiceFactory.class.getName(); + + private static Connection mjs() throws SQLException { + return DriverManager.getConnection("jdbc:avatica:remote:factory=" + MJS); + } + + private static Connection mpbs() throws SQLException { + return DriverManager.getConnection("jdbc:avatica:remote:factory=" + MPBS); + } + + @Parameters + public static List<Object[]> parameters() { + List<Object[]> parameters = new ArrayList<>(); + + parameters.add(new Object[] {new Callable<Connection>() { + public Connection call() throws SQLException { + return mjs(); + } + } }); + + parameters.add(new Object[] {new Callable<Connection>() { + public Connection call() throws SQLException { + return mpbs(); + } + } }); + + return parameters; + } + + private final Callable<Connection> connectionFunctor; + + public RemoteDriverMockTest(Callable<Connection> functor) { + this.connectionFunctor = functor; + } + + private Connection getMockConnection() { + try { + return connectionFunctor.call(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test public void testRegister() throws Exception { + final Connection connection = getMockConnection(); + assertThat(connection.isClosed(), is(false)); + connection.close(); + assertThat(connection.isClosed(), is(true)); + } + + @Test public void testSchemas() throws Exception { + final Connection connection = getMockConnection(); + final ResultSet resultSet = + connection.getMetaData().getSchemas(null, null); + assertFalse(resultSet.next()); + final ResultSetMetaData metaData = resultSet.getMetaData(); + assertTrue(metaData.getColumnCount() >= 2); + assertEquals("TABLE_CATALOG", metaData.getColumnName(1)); + assertEquals("TABLE_SCHEM", metaData.getColumnName(2)); + resultSet.close(); + connection.close(); + } + + @Test public void testTables() throws Exception { + final Connection connection = getMockConnection(); + final ResultSet resultSet = + connection.getMetaData().getTables(null, null, null, new String[0]); + assertFalse(resultSet.next()); + final ResultSetMetaData metaData = resultSet.getMetaData(); + assertTrue(metaData.getColumnCount() >= 3); + assertEquals("TABLE_CAT", metaData.getColumnName(1)); + assertEquals("TABLE_SCHEM", metaData.getColumnName(2)); + assertEquals("TABLE_NAME", metaData.getColumnName(3)); + resultSet.close(); + connection.close(); + } + + @Ignore + @Test public void testNoFactory() throws Exception { + final Connection connection = + DriverManager.getConnection("jdbc:avatica:remote:"); + assertThat(connection.isClosed(), is(false)); + final ResultSet resultSet = connection.getMetaData().getSchemas(); + assertFalse(resultSet.next()); + final ResultSetMetaData metaData = resultSet.getMetaData(); + assertEquals(2, metaData.getColumnCount()); + assertEquals("TABLE_SCHEM", metaData.getColumnName(1)); + assertEquals("TABLE_CATALOG", metaData.getColumnName(2)); + resultSet.close(); + connection.close(); + assertThat(connection.isClosed(), is(true)); + } + + @Ignore + @Test public void testCatalogsMock() throws Exception { + final Connection connection = getMockConnection(); + assertThat(connection.isClosed(), is(false)); + final ResultSet resultSet = connection.getMetaData().getSchemas(); + assertFalse(resultSet.next()); + final ResultSetMetaData metaData = resultSet.getMetaData(); + assertEquals(2, metaData.getColumnCount()); + assertEquals("TABLE_SCHEM", metaData.getColumnName(1)); + assertEquals("TABLE_CATALOG", metaData.getColumnName(2)); + resultSet.close(); + connection.close(); + assertThat(connection.isClosed(), is(true)); + } + + @Ignore + @Test public void testStatementExecuteQueryMock() throws Exception { + checkStatementExecuteQuery(getMockConnection(), false); + } + + @Ignore + @Test public void testPrepareExecuteQueryMock() throws Exception { + checkStatementExecuteQuery(getMockConnection(), true); + } + + private void checkStatementExecuteQuery(Connection connection, + boolean prepare) throws SQLException { + final String sql = "select * from (\n" + + " values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)"; + final Statement statement; + final ResultSet resultSet; + final ParameterMetaData parameterMetaData; + if (prepare) { + final PreparedStatement ps = connection.prepareStatement(sql); + statement = ps; + parameterMetaData = ps.getParameterMetaData(); + resultSet = ps.executeQuery(); + } else { + statement = connection.createStatement(); + parameterMetaData = null; + resultSet = statement.executeQuery(sql); + } + if (parameterMetaData != null) { + assertThat(parameterMetaData.getParameterCount(), equalTo(0)); + } + final ResultSetMetaData metaData = resultSet.getMetaData(); + assertEquals(2, metaData.getColumnCount()); + assertEquals("C1", metaData.getColumnName(1)); + assertEquals("C2", metaData.getColumnName(2)); + assertTrue(resultSet.next()); + assertTrue(resultSet.next()); + assertTrue(resultSet.next()); + assertFalse(resultSet.next()); + resultSet.close(); + statement.close(); + connection.close(); + } + + @Test public void testResultSetsFinagled() throws Exception { + // These values specified in MockJsonService + final String table = "my_table"; + final long value = 10; + + final Connection connection = getMockConnection(); + // Not an accurate ResultSet per JDBC, but close enough for testing. + ResultSet results = connection.getMetaData().getColumns(null, null, table, null); + assertTrue(results.next()); + assertEquals(table, results.getString(1)); + assertEquals(value, results.getLong(2)); + } + +} + +// End RemoteDriverMockTest.java
