http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/KerberosConnection.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/KerberosConnection.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/KerberosConnection.java deleted file mode 100644 index da701dc..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/KerberosConnection.java +++ /dev/null @@ -1,400 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.lang.Thread.UncaughtExceptionHandler; -import java.security.Principal; -import java.util.AbstractMap; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map.Entry; -import java.util.Objects; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.security.auth.Subject; -import javax.security.auth.kerberos.KerberosPrincipal; -import javax.security.auth.kerberos.KerberosTicket; -import javax.security.auth.login.AppConfigurationEntry; -import javax.security.auth.login.Configuration; -import javax.security.auth.login.LoginContext; -import javax.security.auth.login.LoginException; - -/** - * A utility to perform Kerberos logins and renewals. - */ -public class KerberosConnection { - private static final Logger LOG = LoggerFactory.getLogger(KerberosConnection.class); - - private static final String IBM_KRB5_LOGIN_MODULE = - "com.ibm.security.auth.module.Krb5LoginModule"; - private static final String SUN_KRB5_LOGIN_MODULE = - "com.sun.security.auth.module.Krb5LoginModule"; - private static final String JAAS_CONF_NAME = "AvaticaKeytabConf"; - private static final String RENEWAL_THREAD_NAME = "Avatica Kerberos Renewal Thread"; - - /** The percentage of the Kerberos ticket's lifetime which we should start trying to renew it */ - public static final float PERCENT_OF_LIFETIME_TO_RENEW = 0.80f; - /** How long should we sleep between checks to renew the Kerberos ticket */ - public static final long RENEWAL_PERIOD = 30L; - - private final String principal; - private final Configuration jaasConf; - private Subject subject; - private RenewalTask renewalTask; - private Thread renewalThread; - - /** - * Constructs an instance. - * - * @param principal The Kerberos principal - * @param keytab The keytab containing keys for the Kerberos principal - */ - public KerberosConnection(String principal, File keytab) { - this.principal = Objects.requireNonNull(principal); - this.jaasConf = new KeytabJaasConf(principal, Objects.requireNonNull(keytab)); - } - - public synchronized Subject getSubject() { - return this.subject; - } - - /** - * Perform a Kerberos login and launch a daemon thread to periodically perfrom renewals of that - * Kerberos login. Exceptions are intentionally caught and rethrown as unchecked exceptions as - * there is nothing Avatica itself can do if the Kerberos login fails. - * - * @throws RuntimeException If the Kerberos login fails - */ - public synchronized void login() { - final Entry<LoginContext, Subject> securityMaterial = performKerberosLogin(); - subject = securityMaterial.getValue(); - // Launch a thread to periodically perform renewals - final Entry<RenewalTask, Thread> renewalMaterial = createRenewalThread( - securityMaterial.getKey(), subject, KerberosConnection.RENEWAL_PERIOD); - renewalTask = renewalMaterial.getKey(); - renewalThread = renewalMaterial.getValue(); - renewalThread.start(); - } - - /** - * Performs a Kerberos login given the {@code principal} and {@code keytab}. - * - * @return The {@code Subject} and {@code LoginContext} from the successful login. - * @throws RuntimeException if the login failed - */ - Entry<LoginContext, Subject> performKerberosLogin() { - // Loosely based on Apache Kerby's JaasKrbUtil class - // Synchronized by the caller - - // Create a KerberosPrincipal given the principal. - final Set<Principal> principals = new HashSet<Principal>(); - principals.add(new KerberosPrincipal(principal)); - - final Subject subject = new Subject(false, principals, new HashSet<Object>(), - new HashSet<Object>()); - - try { - return login(null, jaasConf, subject); - } catch (Exception e) { - throw new RuntimeException("Failed to perform Kerberos login"); - } - } - - /** - * Performs a kerberos login, possibly logging out first. - * - * @param prevContext The LoginContext from the previous login, or null - * @param conf JAAS Configuration object - * @param subject The JAAS Subject - * @return The context and subject from the login - * @throws LoginException If the login failed. - */ - Entry<LoginContext, Subject> login(LoginContext prevContext, Configuration conf, - Subject subject) throws LoginException { - // Is synchronized by the caller - - // If a context was provided, perform a logout first - if (null != prevContext) { - prevContext.logout(); - } - - // Create a LoginContext given the Configuration and Subject - LoginContext loginContext = createLoginContext(conf); - // Invoke the login - loginContext.login(); - // Get the Subject from the context and verify it's non-null (null would imply failure) - Subject loggedInSubject = loginContext.getSubject(); - if (null == loggedInSubject) { - throw new RuntimeException("Failed to perform Kerberos login"); - } - - // Send it back to the caller to use with launchRenewalThread - return new AbstractMap.SimpleEntry<>(loginContext, loggedInSubject); - } - - // Enables mocking for unit tests - LoginContext createLoginContext(Configuration conf) throws LoginException { - return new LoginContext(JAAS_CONF_NAME, subject, null, conf); - } - - /** - * Launches a thread to periodically check the current ticket's lifetime and perform a relogin - * as necessary. - * - * @param originalContext The original login's context. - * @param originalSubject The original login's subject. - * @param renewalPeriod The amount of time to sleep inbetween checks to renew - */ - Entry<RenewalTask, Thread> createRenewalThread(LoginContext originalContext, - Subject originalSubject, long renewalPeriod) { - RenewalTask task = new RenewalTask(this, originalContext, originalSubject, jaasConf, - renewalPeriod); - Thread t = new Thread(task); - - // Don't prevent the JVM from existing - t.setDaemon(true); - // Log an error message if this thread somehow dies - t.setUncaughtExceptionHandler(new UncaughtExceptionHandler() { - @Override public void uncaughtException(Thread t, Throwable e) { - LOG.error("Uncaught exception from Kerberos credential renewal thread", e); - } - }); - t.setName(RENEWAL_THREAD_NAME); - - return new AbstractMap.SimpleEntry<>(task, t); - } - - /** - * Stops the Kerberos renewal thread if it is still running. If the thread was already started - * or never started, this method does nothing. - */ - public void stopRenewalThread() { - if (null != renewalTask && null != renewalThread) { - LOG.debug("Informing RenewalTask to gracefully stop and interrupting the renewal thread."); - renewalTask.asyncStop(); - - long now = System.currentTimeMillis(); - long until = now + 5000; - while (now < until) { - if (renewalThread.isAlive()) { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - - now = System.currentTimeMillis(); - } else { - break; - } - } - - if (renewalThread.isAlive()) { - LOG.warn("Renewal thread failed to gracefully stop, interrupting it"); - renewalThread.interrupt(); - try { - renewalThread.join(5000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - - // What more could we do? - if (renewalThread.isAlive()) { - LOG.warn("Renewal thread failed to gracefully and ungracefully stop, proceeding."); - } - - renewalTask = null; - renewalThread = null; - } else { - LOG.warn("Renewal thread was never started or already stopped."); - } - } - - /** - * Runnable for performing Kerberos renewals. - */ - static class RenewalTask implements Runnable { - private static final Logger RENEWAL_LOG = LoggerFactory.getLogger(RenewalTask.class); - // Mutable variables -- change as re-login occurs - private LoginContext context; - private Subject subject; - private final KerberosConnection utilInstance; - private final Configuration conf; - private final long renewalPeriod; - private final AtomicBoolean keepRunning = new AtomicBoolean(true); - - public RenewalTask(KerberosConnection utilInstance, LoginContext context, Subject subject, - Configuration conf, long renewalPeriod) { - this.utilInstance = Objects.requireNonNull(utilInstance); - this.context = Objects.requireNonNull(context); - this.subject = Objects.requireNonNull(subject); - this.conf = Objects.requireNonNull(conf); - this.renewalPeriod = renewalPeriod; - } - - @Override public void run() { - while (keepRunning.get() && !Thread.currentThread().isInterrupted()) { - RENEWAL_LOG.debug("Checking if Kerberos ticket should be renewed"); - // The current time - final long now = System.currentTimeMillis(); - - // Find the TGT in the Subject for the principal we were given. - Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class); - KerberosTicket activeTicket = null; - for (KerberosTicket ticket : tickets) { - if (isTGSPrincipal(ticket.getServer())) { - activeTicket = ticket; - break; - } - } - - // If we have no active ticket, immediately renew and check again to make sure we have - // a valid ticket now. - if (null == activeTicket) { - RENEWAL_LOG.debug("There is no active Kerberos ticket, renewing now"); - renew(); - continue; - } - - // Only renew when we hit a certain threshold of the current ticket's lifetime. - // We want to limit the number of renewals we have to invoke. - if (shouldRenew(activeTicket.getStartTime().getTime(), - activeTicket.getEndTime().getTime(), now)) { - RENEWAL_LOG.debug("The current ticket should be renewed now"); - renew(); - } - - // Sleep until we check again - waitForNextCheck(renewalPeriod); - } - } - - /** - * Computes whether or not the ticket should be renewed based on the lifetime of the ticket - * and the current time. - * - * @param start The start time of the ticket's validity in millis - * @param end The end time of the ticket's validity in millis - * @param now Milliseconds since the epoch - * @return True if renewal should occur, false otherwise - */ - boolean shouldRenew(final long start, final long end, long now) { - final long lifetime = end - start; - final long renewAfter = start + (long) (lifetime * PERCENT_OF_LIFETIME_TO_RENEW); - return now >= renewAfter; - } - - /** - * Logout and log back in with the Kerberos identity. - */ - void renew() { - try { - // Lock on the instance of KerberosUtil - synchronized (utilInstance) { - Entry<LoginContext, Subject> pair = utilInstance.login(context, conf, subject); - context = pair.getKey(); - subject = pair.getValue(); - } - } catch (Exception e) { - throw new RuntimeException("Failed to perform kerberos login"); - } - } - - /** - * Wait the given amount of time. - * - * @param renewalPeriod The number of milliseconds to wait - */ - void waitForNextCheck(long renewalPeriod) { - try { - Thread.sleep(renewalPeriod); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - return; - } - } - - void asyncStop() { - keepRunning.set(false); - } - } - - /** - * Computes if the given {@code principal} is the ticket-granting system's principal ("krbtgt"). - * - * @param principal A {@link KerberosPrincipal}. - * @return True if {@code principal} is the TGS principal, false otherwise. - */ - static boolean isTGSPrincipal(KerberosPrincipal principal) { - if (principal == null) { - return false; - } - - if (principal.getName().equals("krbtgt/" + principal.getRealm() + "@" + principal.getRealm())) { - return true; - } - - return false; - } - - /** - * Javax Configuration for performing a keytab-based Kerberos login. - */ - static class KeytabJaasConf extends Configuration { - private String principal; - private File keytabFile; - - KeytabJaasConf(String principal, File keytab) { - this.principal = principal; - this.keytabFile = keytab; - } - - @Override public AppConfigurationEntry[] getAppConfigurationEntry(String name) { - HashMap<String, String> options = new HashMap<String, String>(); - options.put("keyTab", keytabFile.getAbsolutePath()); - options.put("principal", principal); - options.put("useKeyTab", "true"); - options.put("storeKey", "true"); - options.put("doNotPrompt", "true"); - options.put("renewTGT", "false"); - options.put("refreshKrb5Config", "true"); - options.put("isInitiator", "true"); - - return new AppConfigurationEntry[] {new AppConfigurationEntry(getKrb5LoginModuleName(), - AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, options)}; - } - } - - /** - * Returns the KRB5 LoginModule implementation. This is JVM-vendor dependent. - * - * @return The class name of the KRB5 LoginModule - */ - static String getKrb5LoginModuleName() { - return System.getProperty("java.vendor").contains("IBM") ? IBM_KRB5_LOGIN_MODULE - : SUN_KRB5_LOGIN_MODULE; - } -} - -// End KerberosConnection.java
http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java deleted file mode 100644 index 0af6300..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalJsonService.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import java.io.IOException; -import java.io.StringWriter; - -/** - * Implementation of {@link org.apache.calcite.avatica.remote.Service} - * that goes to an in-process instance of {@code Service}. - */ -public class LocalJsonService extends JsonService { - private final Service service; - - public LocalJsonService(Service service) { - this.service = service; - } - - @Override public String apply(String request) { - try { - Request request2 = MAPPER.readValue(request, Request.class); - Response response2 = request2.accept(service); - final StringWriter w = new StringWriter(); - MAPPER.writeValue(w, response2); - return w.toString(); - } catch (IOException e) { - throw handle(e); - } - } -} - -// End LocalJsonService.java http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java deleted file mode 100644 index 76e2392..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalProtobufService.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import java.io.IOException; - -/** - * A Service implementation that performs protocol buffer serialization on request and responses - * on either side of computing a response from a request to mimic some transport to a server which - * would normally perform such computation. - */ -public class LocalProtobufService extends ProtobufService { - private final Service service; - private final ProtobufTranslation translation; - - public LocalProtobufService(Service service, ProtobufTranslation translation) { - this.service = service; - this.translation = translation; - } - - @Override public Response _apply(Request request) { - try { - // Serialize the request to "send to the server" - byte[] serializedRequest = translation.serializeRequest(request); - - // *some transport would normally happen here* - - // Fake deserializing that request somewhere else - Request request2 = translation.parseRequest(serializedRequest); - - // Serialize the response from the service to "send to the client" - byte[] serializedResponse = translation.serializeResponse(request2.accept(service)); - - // *some transport would normally happen here* - - // Deserialize the response on "the client" - return translation.parseResponse(serializedResponse); - } catch (IOException e) { - throw new RuntimeException(e); - } - } -} - -// End LocalProtobufService.java http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java deleted file mode 100644 index 929830b..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/LocalService.java +++ /dev/null @@ -1,376 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.apache.calcite.avatica.AvaticaUtils; -import org.apache.calcite.avatica.Meta; -import org.apache.calcite.avatica.Meta.ExecuteBatchResult; -import org.apache.calcite.avatica.MissingResultsException; -import org.apache.calcite.avatica.NoSuchStatementException; -import org.apache.calcite.avatica.metrics.MetricsSystem; -import org.apache.calcite.avatica.metrics.Timer; -import org.apache.calcite.avatica.metrics.Timer.Context; -import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem; - -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; - -import static org.apache.calcite.avatica.remote.MetricsHelper.concat; - -/** - * Implementation of {@link Service} that talks to a local {@link Meta}. - */ -public class LocalService implements Service { - final Meta meta; - final MetricsSystem metrics; - - private final Timer executeTimer; - private final Timer commitTimer; - private final Timer prepareTimer; - private final Timer prepareAndExecuteTimer; - private final Timer connectionSyncTimer; - - private RpcMetadataResponse serverLevelRpcMetadata; - - public LocalService(Meta meta) { - this(meta, NoopMetricsSystem.getInstance()); - } - - public LocalService(Meta meta, MetricsSystem metrics) { - this.meta = meta; - this.metrics = Objects.requireNonNull(metrics); - - this.executeTimer = this.metrics.getTimer(name("Execute")); - this.commitTimer = this.metrics.getTimer(name("Commit")); - this.prepareTimer = this.metrics.getTimer(name("Prepare")); - this.prepareAndExecuteTimer = this.metrics.getTimer(name("PrepareAndExecute")); - this.connectionSyncTimer = this.metrics.getTimer(name("ConnectionSync")); - } - - private static String name(String timer) { - return concat(LocalService.class, timer); - } - - @Override public void setRpcMetadata(RpcMetadataResponse serverLevelRpcMetadata) { - this.serverLevelRpcMetadata = Objects.requireNonNull(serverLevelRpcMetadata); - } - - private static <E> List<E> list(Iterable<E> iterable) { - if (iterable instanceof List) { - return (List<E>) iterable; - } - final List<E> rowList = new ArrayList<>(); - for (E row : iterable) { - rowList.add(row); - } - return rowList; - } - - /** Converts a result set (not serializable) into a serializable response. */ - public ResultSetResponse toResponse(Meta.MetaResultSet resultSet) { - if (resultSet.updateCount != -1) { - return new ResultSetResponse(resultSet.connectionId, - resultSet.statementId, resultSet.ownStatement, null, null, - resultSet.updateCount, serverLevelRpcMetadata); - } - - Meta.Signature signature = resultSet.signature; - Meta.CursorFactory cursorFactory = resultSet.signature.cursorFactory; - Meta.Frame frame = null; - int updateCount = -1; - final List<Object> list; - - if (resultSet.firstFrame != null) { - list = list(resultSet.firstFrame.rows); - switch (cursorFactory.style) { - case ARRAY: - cursorFactory = Meta.CursorFactory.LIST; - break; - case MAP: - case LIST: - break; - case RECORD: - cursorFactory = Meta.CursorFactory.LIST; - break; - default: - cursorFactory = Meta.CursorFactory.map(cursorFactory.fieldNames); - } - - final boolean done = resultSet.firstFrame.done; - - frame = new Meta.Frame(0, done, list); - updateCount = -1; - - if (signature.statementType != null) { - if (signature.statementType.canUpdate()) { - frame = null; - updateCount = ((Number) ((List) list.get(0)).get(0)).intValue(); - } - } - } else { - cursorFactory = Meta.CursorFactory.LIST; - } - - if (cursorFactory != resultSet.signature.cursorFactory) { - signature = signature.setCursorFactory(cursorFactory); - } - - return new ResultSetResponse(resultSet.connectionId, resultSet.statementId, - resultSet.ownStatement, signature, frame, updateCount, serverLevelRpcMetadata); - } - - public ResultSetResponse apply(CatalogsRequest request) { - final Meta.ConnectionHandle ch = - new Meta.ConnectionHandle(request.connectionId); - final Meta.MetaResultSet resultSet = meta.getCatalogs(ch); - return toResponse(resultSet); - } - - public ResultSetResponse apply(SchemasRequest request) { - final Meta.ConnectionHandle ch = - new Meta.ConnectionHandle(request.connectionId); - final Meta.MetaResultSet resultSet = - meta.getSchemas(ch, request.catalog, Meta.Pat.of(request.schemaPattern)); - return toResponse(resultSet); - } - - public ResultSetResponse apply(TablesRequest request) { - final Meta.ConnectionHandle ch = - new Meta.ConnectionHandle(request.connectionId); - final Meta.MetaResultSet resultSet = - meta.getTables(ch, - request.catalog, - Meta.Pat.of(request.schemaPattern), - Meta.Pat.of(request.tableNamePattern), - request.typeList); - return toResponse(resultSet); - } - - public ResultSetResponse apply(TableTypesRequest request) { - final Meta.ConnectionHandle ch = - new Meta.ConnectionHandle(request.connectionId); - final Meta.MetaResultSet resultSet = meta.getTableTypes(ch); - return toResponse(resultSet); - } - - public ResultSetResponse apply(TypeInfoRequest request) { - final Meta.ConnectionHandle ch = - new Meta.ConnectionHandle(request.connectionId); - final Meta.MetaResultSet resultSet = meta.getTypeInfo(ch); - return toResponse(resultSet); - } - - public ResultSetResponse apply(ColumnsRequest request) { - final Meta.ConnectionHandle ch = - new Meta.ConnectionHandle(request.connectionId); - final Meta.MetaResultSet resultSet = - meta.getColumns(ch, - request.catalog, - Meta.Pat.of(request.schemaPattern), - Meta.Pat.of(request.tableNamePattern), - Meta.Pat.of(request.columnNamePattern)); - return toResponse(resultSet); - } - - public PrepareResponse apply(PrepareRequest request) { - try (final Context ignore = prepareTimer.start()) { - final Meta.ConnectionHandle ch = - new Meta.ConnectionHandle(request.connectionId); - final Meta.StatementHandle h = - meta.prepare(ch, request.sql, request.maxRowCount); - return new PrepareResponse(h, serverLevelRpcMetadata); - } - } - - public ExecuteResponse apply(PrepareAndExecuteRequest request) { - try (final Context ignore = prepareAndExecuteTimer.start()) { - final Meta.StatementHandle sh = - new Meta.StatementHandle(request.connectionId, request.statementId, null); - try { - final Meta.ExecuteResult executeResult = - meta.prepareAndExecute(sh, request.sql, request.maxRowCount, - request.maxRowsInFirstFrame, new Meta.PrepareCallback() { - @Override public Object getMonitor() { - return LocalService.class; - } - - @Override public void clear() { - } - - @Override public void assign(Meta.Signature signature, - Meta.Frame firstFrame, long updateCount) { - } - - @Override public void execute() { - } - }); - final List<ResultSetResponse> results = new ArrayList<>(); - for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) { - results.add(toResponse(metaResultSet)); - } - return new ExecuteResponse(results, false, serverLevelRpcMetadata); - } catch (NoSuchStatementException e) { - // The Statement doesn't exist anymore, bubble up this information - return new ExecuteResponse(null, true, serverLevelRpcMetadata); - } - } - } - - public FetchResponse apply(FetchRequest request) { - final Meta.StatementHandle h = new Meta.StatementHandle( - request.connectionId, request.statementId, null); - try { - final Meta.Frame frame = - meta.fetch(h, - request.offset, - request.fetchMaxRowCount); - return new FetchResponse(frame, false, false, serverLevelRpcMetadata); - } catch (NullPointerException | NoSuchStatementException e) { - // The Statement doesn't exist anymore, bubble up this information - return new FetchResponse(null, true, true, serverLevelRpcMetadata); - } catch (MissingResultsException e) { - return new FetchResponse(null, false, true, serverLevelRpcMetadata); - } - } - - public ExecuteResponse apply(ExecuteRequest request) { - try (final Context ignore = executeTimer.start()) { - try { - final Meta.ExecuteResult executeResult = meta.execute(request.statementHandle, - request.parameterValues, AvaticaUtils.toSaturatedInt(request.maxRowCount)); - - final List<ResultSetResponse> results = new ArrayList<>(executeResult.resultSets.size()); - for (Meta.MetaResultSet metaResultSet : executeResult.resultSets) { - results.add(toResponse(metaResultSet)); - } - return new ExecuteResponse(results, false, serverLevelRpcMetadata); - } catch (NoSuchStatementException e) { - return new ExecuteResponse(null, true, serverLevelRpcMetadata); - } - } - } - - public CreateStatementResponse apply(CreateStatementRequest request) { - final Meta.ConnectionHandle ch = - new Meta.ConnectionHandle(request.connectionId); - final Meta.StatementHandle h = meta.createStatement(ch); - return new CreateStatementResponse(h.connectionId, h.id, serverLevelRpcMetadata); - } - - public CloseStatementResponse apply(CloseStatementRequest request) { - final Meta.StatementHandle h = new Meta.StatementHandle( - request.connectionId, request.statementId, null); - meta.closeStatement(h); - return new CloseStatementResponse(serverLevelRpcMetadata); - } - - public OpenConnectionResponse apply(OpenConnectionRequest request) { - final Meta.ConnectionHandle ch = - new Meta.ConnectionHandle(request.connectionId); - meta.openConnection(ch, request.info); - return new OpenConnectionResponse(serverLevelRpcMetadata); - } - - public CloseConnectionResponse apply(CloseConnectionRequest request) { - final Meta.ConnectionHandle ch = - new Meta.ConnectionHandle(request.connectionId); - meta.closeConnection(ch); - return new CloseConnectionResponse(serverLevelRpcMetadata); - } - - public ConnectionSyncResponse apply(ConnectionSyncRequest request) { - try (final Context ignore = connectionSyncTimer.start()) { - final Meta.ConnectionHandle ch = - new Meta.ConnectionHandle(request.connectionId); - final Meta.ConnectionProperties connProps = - meta.connectionSync(ch, request.connProps); - return new ConnectionSyncResponse(connProps, serverLevelRpcMetadata); - } - } - - public DatabasePropertyResponse apply(DatabasePropertyRequest request) { - final Meta.ConnectionHandle ch = - new Meta.ConnectionHandle(request.connectionId); - return new DatabasePropertyResponse(meta.getDatabaseProperties(ch), serverLevelRpcMetadata); - } - - public SyncResultsResponse apply(SyncResultsRequest request) { - final Meta.StatementHandle h = new Meta.StatementHandle( - request.connectionId, request.statementId, null); - SyncResultsResponse response; - try { - // Set success on the cached statement - response = new SyncResultsResponse(meta.syncResults(h, request.state, request.offset), false, - serverLevelRpcMetadata); - } catch (NoSuchStatementException e) { - // Tried to sync results on a statement which wasn't cached - response = new SyncResultsResponse(false, true, serverLevelRpcMetadata); - } - - return response; - } - - public CommitResponse apply(CommitRequest request) { - try (final Context ignore = commitTimer.start()) { - meta.commit(new Meta.ConnectionHandle(request.connectionId)); - - // If commit() errors, let the ErrorResponse be sent back via an uncaught Exception. - return new CommitResponse(); - } - } - - public RollbackResponse apply(RollbackRequest request) { - meta.rollback(new Meta.ConnectionHandle(request.connectionId)); - - // If rollback() errors, let the ErrorResponse be sent back via an uncaught Exception. - return new RollbackResponse(); - } - - public ExecuteBatchResponse apply(PrepareAndExecuteBatchRequest request) { - final Meta.StatementHandle h = new Meta.StatementHandle(request.connectionId, - request.statementId, null); - try { - ExecuteBatchResult result = meta.prepareAndExecuteBatch(h, request.sqlCommands); - return new ExecuteBatchResponse(request.connectionId, request.statementId, - result.updateCounts, false, serverLevelRpcMetadata); - } catch (NoSuchStatementException e) { - return new ExecuteBatchResponse(request.connectionId, request.statementId, null, true, - serverLevelRpcMetadata); - } - } - - public ExecuteBatchResponse apply(ExecuteBatchRequest request) { - final Meta.StatementHandle h = new Meta.StatementHandle(request.connectionId, - request.statementId, null); - try { - ExecuteBatchResult result; - if (request.hasProtoUpdateBatches() && meta instanceof ProtobufMeta) { - result = ((ProtobufMeta) meta).executeBatchProtobuf(h, request.getProtoUpdateBatches()); - } else { - result = meta.executeBatch(h, request.parameterValues); - } - return new ExecuteBatchResponse(request.connectionId, request.statementId, - result.updateCounts, false, serverLevelRpcMetadata); - } catch (NoSuchStatementException e) { - return new ExecuteBatchResponse(request.connectionId, request.statementId, null, true, - serverLevelRpcMetadata); - } - } -} - -// End LocalService.java http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java deleted file mode 100644 index 12a5b59..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetaDataOperation.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.apache.calcite.avatica.proto.Common; - -import java.sql.DatabaseMetaData; -import java.sql.ResultSet; - -/** - * Identifies an operation from {@link DatabaseMetaData} which returns a {@link ResultSet}. This - * enum is used to allow clients to request the server to re-instantiate a {@link ResultSet} for - * these operations which do not have a SQL string associated with them as a normal query does. - */ -public enum MetaDataOperation { - GET_ATTRIBUTES, - GET_BEST_ROW_IDENTIFIER, - GET_CATALOGS, - GET_CLIENT_INFO_PROPERTIES, - GET_COLUMN_PRIVILEGES, - GET_COLUMNS, - GET_CROSS_REFERENCE, - GET_EXPORTED_KEYS, - GET_FUNCTION_COLUMNS, - GET_FUNCTIONS, - GET_IMPORTED_KEYS, - GET_INDEX_INFO, - GET_PRIMARY_KEYS, - GET_PROCEDURE_COLUMNS, - GET_PROCEDURES, - GET_PSEUDO_COLUMNS, - GET_SCHEMAS, - GET_SCHEMAS_WITH_ARGS, - GET_SUPER_TABLES, - GET_SUPER_TYPES, - GET_TABLE_PRIVILEGES, - GET_TABLES, - GET_TABLE_TYPES, - GET_TYPE_INFO, - GET_UDTS, - GET_VERSION_COLUMNS; - - public Common.MetaDataOperation toProto() { - switch (this) { - case GET_ATTRIBUTES: - return Common.MetaDataOperation.GET_ATTRIBUTES; - case GET_BEST_ROW_IDENTIFIER: - return Common.MetaDataOperation.GET_BEST_ROW_IDENTIFIER; - case GET_CATALOGS: - return Common.MetaDataOperation.GET_CATALOGS; - case GET_CLIENT_INFO_PROPERTIES: - return Common.MetaDataOperation.GET_CLIENT_INFO_PROPERTIES; - case GET_COLUMNS: - return Common.MetaDataOperation.GET_COLUMNS; - case GET_COLUMN_PRIVILEGES: - return Common.MetaDataOperation.GET_COLUMN_PRIVILEGES; - case GET_CROSS_REFERENCE: - return Common.MetaDataOperation.GET_CROSS_REFERENCE; - case GET_EXPORTED_KEYS: - return Common.MetaDataOperation.GET_EXPORTED_KEYS; - case GET_FUNCTIONS: - return Common.MetaDataOperation.GET_FUNCTIONS; - case GET_FUNCTION_COLUMNS: - return Common.MetaDataOperation.GET_FUNCTION_COLUMNS; - case GET_IMPORTED_KEYS: - return Common.MetaDataOperation.GET_IMPORTED_KEYS; - case GET_INDEX_INFO: - return Common.MetaDataOperation.GET_INDEX_INFO; - case GET_PRIMARY_KEYS: - return Common.MetaDataOperation.GET_PRIMARY_KEYS; - case GET_PROCEDURES: - return Common.MetaDataOperation.GET_PROCEDURES; - case GET_PROCEDURE_COLUMNS: - return Common.MetaDataOperation.GET_PROCEDURE_COLUMNS; - case GET_PSEUDO_COLUMNS: - return Common.MetaDataOperation.GET_PSEUDO_COLUMNS; - case GET_SCHEMAS: - return Common.MetaDataOperation.GET_SCHEMAS; - case GET_SCHEMAS_WITH_ARGS: - return Common.MetaDataOperation.GET_SCHEMAS_WITH_ARGS; - case GET_SUPER_TABLES: - return Common.MetaDataOperation.GET_SUPER_TABLES; - case GET_SUPER_TYPES: - return Common.MetaDataOperation.GET_SUPER_TYPES; - case GET_TABLES: - return Common.MetaDataOperation.GET_TABLES; - case GET_TABLE_PRIVILEGES: - return Common.MetaDataOperation.GET_TABLE_PRIVILEGES; - case GET_TABLE_TYPES: - return Common.MetaDataOperation.GET_TABLE_TYPES; - case GET_TYPE_INFO: - return Common.MetaDataOperation.GET_TYPE_INFO; - case GET_UDTS: - return Common.MetaDataOperation.GET_UDTS; - case GET_VERSION_COLUMNS: - return Common.MetaDataOperation.GET_VERSION_COLUMNS; - default: - throw new RuntimeException("Unknown type: " + this); - } - } - - public static MetaDataOperation fromProto(Common.MetaDataOperation protoOp) { - // Null is acceptable - if (null == protoOp) { - return null; - } - - switch (protoOp) { - case GET_ATTRIBUTES: - return MetaDataOperation.GET_ATTRIBUTES; - case GET_BEST_ROW_IDENTIFIER: - return MetaDataOperation.GET_BEST_ROW_IDENTIFIER; - case GET_CATALOGS: - return MetaDataOperation.GET_CATALOGS; - case GET_CLIENT_INFO_PROPERTIES: - return MetaDataOperation.GET_CLIENT_INFO_PROPERTIES; - case GET_COLUMNS: - return MetaDataOperation.GET_COLUMNS; - case GET_COLUMN_PRIVILEGES: - return MetaDataOperation.GET_COLUMN_PRIVILEGES; - case GET_CROSS_REFERENCE: - return MetaDataOperation.GET_CROSS_REFERENCE; - case GET_EXPORTED_KEYS: - return MetaDataOperation.GET_EXPORTED_KEYS; - case GET_FUNCTIONS: - return MetaDataOperation.GET_FUNCTIONS; - case GET_FUNCTION_COLUMNS: - return MetaDataOperation.GET_FUNCTION_COLUMNS; - case GET_IMPORTED_KEYS: - return MetaDataOperation.GET_IMPORTED_KEYS; - case GET_INDEX_INFO: - return MetaDataOperation.GET_INDEX_INFO; - case GET_PRIMARY_KEYS: - return MetaDataOperation.GET_PRIMARY_KEYS; - case GET_PROCEDURES: - return MetaDataOperation.GET_PROCEDURES; - case GET_PROCEDURE_COLUMNS: - return MetaDataOperation.GET_PROCEDURE_COLUMNS; - case GET_PSEUDO_COLUMNS: - return MetaDataOperation.GET_PSEUDO_COLUMNS; - case GET_SCHEMAS: - return MetaDataOperation.GET_SCHEMAS; - case GET_SCHEMAS_WITH_ARGS: - return MetaDataOperation.GET_SCHEMAS_WITH_ARGS; - case GET_SUPER_TABLES: - return MetaDataOperation.GET_SUPER_TABLES; - case GET_SUPER_TYPES: - return MetaDataOperation.GET_SUPER_TYPES; - case GET_TABLES: - return MetaDataOperation.GET_TABLES; - case GET_TABLE_PRIVILEGES: - return MetaDataOperation.GET_TABLE_PRIVILEGES; - case GET_TABLE_TYPES: - return MetaDataOperation.GET_TABLE_TYPES; - case GET_TYPE_INFO: - return MetaDataOperation.GET_TYPE_INFO; - case GET_UDTS: - return MetaDataOperation.GET_UDTS; - case GET_VERSION_COLUMNS: - return MetaDataOperation.GET_VERSION_COLUMNS; - default: - throw new RuntimeException("Unknown type: " + protoOp); - } - } -} - -// End MetaDataOperation.java http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java deleted file mode 100644 index 2561b29..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MetricsHelper.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -/** - * A utility class to encapsulate common logic in use of metrics implementation. - */ -public class MetricsHelper { - - private static final String PERIOD = "."; - - private MetricsHelper() {} - - public static String concat(Class<?> clz, String name) { - StringBuilder sb = new StringBuilder(); - sb.append(clz.getName()); - return sb.append(PERIOD).append(name).toString(); - } - -} - -// End MetricsHelper.java http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java deleted file mode 100644 index 11a6104..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockJsonService.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.apache.calcite.avatica.AvaticaConnection; - -import java.io.IOException; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Map; - -/** - * Mock implementation of {@link Service} - * that encodes its requests and responses as JSON - * and looks up responses from a pre-defined map. - */ -public class MockJsonService extends JsonService { - private final Map<String, String> map; - - public MockJsonService(Map<String, String> map) { - this.map = map; - } - - @Override public String apply(String request) { - String response = map.get(request); - if (response == null) { - throw new RuntimeException("No response for " + request); - } - return response; - } - - /** Factory that creates a {@code MockJsonService}. */ - public static class Factory implements Service.Factory { - public Service create(AvaticaConnection connection) { - final String connectionId = connection.id; - final Map<String, String> map1 = new HashMap<>(); - try { - map1.put( - "{\"request\":\"openConnection\",\"connectionId\":\"" + connectionId + "\",\"info\":{}}", - "{\"response\":\"openConnection\"}"); - map1.put( - "{\"request\":\"closeConnection\",\"connectionId\":\"" + connectionId + "\"}", - "{\"response\":\"closeConnection\"}"); - map1.put( - "{\"request\":\"getSchemas\",\"catalog\":null,\"schemaPattern\":{\"s\":null}}", - "{\"response\":\"resultSet\", updateCount: -1, firstFrame: {offset: 0, done: true, rows: []}}"); - map1.put( - JsonService.encode(new SchemasRequest(connectionId, null, null)), - "{\"response\":\"resultSet\", updateCount: -1, firstFrame: {offset: 0, done: true, rows: []}}"); - map1.put( - JsonService.encode( - new TablesRequest(connectionId, null, null, null, Arrays.<String>asList())), - "{\"response\":\"resultSet\", updateCount: -1, firstFrame: {offset: 0, done: true, rows: []}}"); - map1.put( - "{\"request\":\"createStatement\",\"connectionId\":\"" + connectionId + "\"}", - "{\"response\":\"createStatement\",\"id\":0}"); - map1.put( - "{\"request\":\"prepareAndExecute\",\"statementId\":0," - + "\"sql\":\"select * from (\\n values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)\",\"maxRowCount\":-1}", - "{\"response\":\"resultSet\", updateCount: -1, \"signature\": {\n" - + " \"columns\": [\n" - + " {\"columnName\": \"C1\", \"type\": {type: \"scalar\", id: 4, rep: \"INTEGER\"}},\n" - + " {\"columnName\": \"C2\", \"type\": {type: \"scalar\", id: 12, rep: \"STRING\"}}\n" - + " ], \"cursorFactory\": {\"style\": \"ARRAY\"}\n" - + "}, \"rows\": [[1, \"a\"], [null, \"b\"], [3, \"c\"]]}"); - map1.put( - "{\"request\":\"prepare\",\"statementId\":0," - + "\"sql\":\"select * from (\\n values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)\",\"maxRowCount\":-1}", - "{\"response\":\"prepare\",\"signature\": {\n" - + " \"columns\": [\n" - + " {\"columnName\": \"C1\", \"type\": {type: \"scalar\", id: 4, rep: \"INTEGER\"}},\n" - + " {\"columnName\": \"C2\", \"type\": {type: \"scalar\", id: 12, rep: \"STRING\"}}\n" - + " ],\n" - + " \"parameters\": [],\n" - + " \"cursorFactory\": {\"style\": \"ARRAY\"}\n" - + "}}"); - map1.put( - "{\"request\":\"getColumns\",\"connectionId\":\"" + connectionId + "\",\"catalog\":null,\"schemaPattern\":null," - + "\"tableNamePattern\":\"my_table\",\"columnNamePattern\":null}", - "{\"response\":\"resultSet\",\"connectionId\":\"00000000-0000-0000-0000-000000000000\",\"statementId\":-1,\"ownStatement\":true," - + "\"signature\":{\"columns\":[" - + "{\"ordinal\":0,\"autoIncrement\":false,\"caseSensitive\":false,\"searchable\":true,\"currency\":false,\"nullable\":1,\"signed\":false," - + "\"displaySize\":40,\"label\":\"TABLE_NAME\",\"columnName\":\"TABLE_NAME\",\"schemaName\":\"\",\"precision\":0,\"scale\":0,\"tableName\":\"SYSTEM.TABLE\"," - + "\"catalogName\":\"\",\"type\":{\"type\":\"scalar\",\"id\":12,\"name\":\"VARCHAR\",\"rep\":\"STRING\"},\"readOnly\":true,\"writable\":false," - + "\"definitelyWritable\":false,\"columnClassName\":\"java.lang.String\"}," - + "{\"ordinal\":1,\"autoIncrement\":false,\"caseSensitive\":false,\"searchable\":true,\"currency\":false,\"nullable\":1,\"signed\":true," - + "\"displaySize\":40,\"label\":\"ORDINAL_POSITION\",\"columnName\":\"ORDINAL_POSITION\",\"schemaName\":\"\",\"precision\":0,\"scale\":0," - + "\"tableName\":\"SYSTEM.TABLE\",\"catalogName\":\"\",\"type\":{\"type\":\"scalar\",\"id\":-5,\"name\":\"BIGINT\",\"rep\":\"PRIMITIVE_LONG\"}," - + "\"readOnly\":true,\"writable\":false,\"definitelyWritable\":false,\"columnClassName\":\"java.lang.Long\"}" - + "],\"sql\":null," - + "\"parameters\":[]," - + "\"cursorFactory\":{\"style\":\"LIST\",\"clazz\":null,\"fieldNames\":null},\"statementType\":null}," - + "\"firstFrame\":{\"offset\":0,\"done\":true," - + "\"rows\":[[\"my_table\",10]]" - + "},\"updateCount\":-1}"); - } catch (IOException e) { - throw new RuntimeException(e); - } - return new MockJsonService(map1); - } - } -} - -// End MockJsonService.java http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java deleted file mode 100644 index a2cdd67..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/MockProtobufService.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.apache.calcite.avatica.AvaticaConnection; -import org.apache.calcite.avatica.AvaticaParameter; -import org.apache.calcite.avatica.ColumnMetaData; -import org.apache.calcite.avatica.Meta; -import org.apache.calcite.avatica.MetaImpl; - -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - -/** - * A mock implementation of ProtobufService for testing. - * - * <p>It performs no serialization of requests and responses. - */ -public class MockProtobufService extends ProtobufService { - - private final String connectionId; - private final Map<Request, Response> mapping; - - public MockProtobufService(String connectionId) { - this.connectionId = connectionId; - this.mapping = createMapping(); - } - - private Map<Request, Response> createMapping() { - HashMap<Request, Response> mappings = new HashMap<>(); - - // Add in mappings - - mappings.put( - new OpenConnectionRequest(connectionId, new HashMap<String, String>()), - new OpenConnectionResponse()); - - // Get the schema, no.. schema..? - mappings.put( - new SchemasRequest(connectionId, null, null), - // ownStatement=false just to avoid the extra close statement call. - new ResultSetResponse(null, 1, false, null, Meta.Frame.EMPTY, -1, null)); - - // Get the tables, no tables exist - mappings.put(new TablesRequest(connectionId, null, null, null, Collections.<String>emptyList()), - // ownStatement=false just to avoid the extra close statement call. - new ResultSetResponse(null, 150, false, null, Meta.Frame.EMPTY, -1, null)); - - // Create a statement, get back an id - mappings.put(new CreateStatementRequest("0"), new CreateStatementResponse("0", 1, null)); - - // Prepare and execute a query. Values and schema are returned - mappings.put( - new PrepareAndExecuteRequest(connectionId, 1, - "select * from (\\n values (1, 'a'), (null, 'b'), (3, 'c')) as t (c1, c2)", -1), - new ResultSetResponse("0", 1, true, - Meta.Signature.create( - Arrays.<ColumnMetaData>asList( - MetaImpl.columnMetaData("C1", 0, Integer.class, true), - MetaImpl.columnMetaData("C2", 1, String.class, true)), - null, null, Meta.CursorFactory.ARRAY, Meta.StatementType.SELECT), - Meta.Frame.create(0, true, - Arrays.<Object>asList(new Object[] {1, "a"}, - new Object[] {null, "b"}, new Object[] {3, "c"})), -1, null)); - - // Prepare a query. Schema for results are returned, but no values - mappings.put( - new PrepareRequest(connectionId, - "select * from (\\n values(1, 'a'), (null, 'b'), (3, 'c')), as t (c1, c2)", -1), - new ResultSetResponse("0", 1, true, - Meta.Signature.create( - Arrays.<ColumnMetaData>asList( - MetaImpl.columnMetaData("C1", 0, Integer.class, true), - MetaImpl.columnMetaData("C2", 1, String.class, true)), - null, Collections.<AvaticaParameter>emptyList(), - Meta.CursorFactory.ARRAY, Meta.StatementType.SELECT), - null, -1, null)); - - mappings.put( - new ColumnsRequest(connectionId, null, null, "my_table", null), - new ResultSetResponse("00000000-0000-0000-0000-000000000000", -1, true, - Meta.Signature.create( - Arrays.<ColumnMetaData>asList( - MetaImpl.columnMetaData("TABLE_NAME", 0, String.class, true), - MetaImpl.columnMetaData("ORDINAL_POSITION", 1, Long.class, true)), null, - Collections.<AvaticaParameter>emptyList(), Meta.CursorFactory.ARRAY, null), - Meta.Frame.create(0, true, - Arrays.<Object>asList(new Object[] {new Object[]{"my_table", 10}})), -1, null)); - - return Collections.unmodifiableMap(mappings); - } - - @Override public Response _apply(Request request) { - if (request instanceof CloseConnectionRequest) { - return new CloseConnectionResponse(); - } - - return dispatch(request); - } - - /** - * Fetches the static response for the given request. - * - * @param request the client's request - * @return the appropriate response - * @throws RuntimeException if no mapping is found for the request - */ - private Response dispatch(Request request) { - Response response = mapping.get(request); - - if (null == response) { - throw new RuntimeException("Had no response mapping for " + request); - } - - return response; - } - - /** - * A factory that instantiates the mock protobuf service. - */ - public static class MockProtobufServiceFactory implements Service.Factory { - @Override public Service create(AvaticaConnection connection) { - return new MockProtobufService(connection.id); - } - } -} - -// End MockProtobufService.java http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java deleted file mode 100644 index 89e380e..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufHandler.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.apache.calcite.avatica.metrics.MetricsSystem; -import org.apache.calcite.avatica.metrics.Timer; -import org.apache.calcite.avatica.metrics.Timer.Context; -import org.apache.calcite.avatica.remote.Service.Response; - -import java.io.IOException; - -/** - * Dispatches serialized protocol buffer messages to the provided {@link Service} - * by converting them to the POJO Request. Returns back the serialized protocol - * buffer response. - */ -public class ProtobufHandler extends AbstractHandler<byte[]> { - - private final ProtobufTranslation translation; - private final MetricsSystem metrics; - private final Timer serializationTimer; - - public ProtobufHandler(Service service, ProtobufTranslation translation, MetricsSystem metrics) { - super(service); - this.translation = translation; - this.metrics = metrics; - this.serializationTimer = this.metrics.getTimer( - MetricsHelper.concat(ProtobufHandler.class, HANDLER_SERIALIZATION_METRICS_NAME)); - } - - @Override public HandlerResponse<byte[]> apply(byte[] requestBytes) { - return super.apply(requestBytes); - } - - @Override Service.Request decode(byte[] serializedRequest) throws IOException { - try (final Context ctx = serializationTimer.start()) { - return translation.parseRequest(serializedRequest); - } - } - - @Override byte[] encode(Response response) throws IOException { - try (final Context ctx = serializationTimer.start()) { - return translation.serializeResponse(response); - } - } -} - -// End ProtobufHandler.java http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufMeta.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufMeta.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufMeta.java deleted file mode 100644 index 375ae80..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufMeta.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.apache.calcite.avatica.Meta; -import org.apache.calcite.avatica.NoSuchStatementException; -import org.apache.calcite.avatica.proto.Requests; - -import java.util.List; - -/** - * An extension of {@link Meta} which allows for native processing of calls with the Protobuf - * API objects instead of the POJOS (to avoid object translation). In the write-path, performing - * this conversion tends to represent a signficant portion of execution time. The introduction - * of this interface is to serve the purose of gradual migration to Meta implementations that - * can naturally function over Protobuf objects instead of the POJOs. - */ -public interface ProtobufMeta extends Meta { - - /** - * Executes a batch of commands on a prepared statement. - * - * @param h Statement handle - * @param parameterValues A collection of list of typed values, one list per batch - * @return An array of update counts containing one element for each command in the batch. - */ - ExecuteBatchResult executeBatchProtobuf(StatementHandle h, List<Requests.UpdateBatch> - parameterValues) throws NoSuchStatementException; -} - -// End ProtobufMeta.java http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java deleted file mode 100644 index d694440..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufService.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import com.google.protobuf.Message; - -/** - * Service implementation that encodes requests and responses as protocol buffers. - */ -public abstract class ProtobufService extends AbstractService { - - /** - * Derived class should implement this method to transport requests and - * responses to and from the peer service. - */ - public abstract Response _apply(Request request); - - @Override SerializationType getSerializationType() { - return SerializationType.PROTOBUF; - } - - @Override public ResultSetResponse apply(CatalogsRequest request) { - return finagle((ResultSetResponse) _apply(request)); - } - - @Override public ResultSetResponse apply(SchemasRequest request) { - return finagle((ResultSetResponse) _apply(request)); - } - - @Override public ResultSetResponse apply(TablesRequest request) { - return finagle((ResultSetResponse) _apply(request)); - } - - @Override public ResultSetResponse apply(TableTypesRequest request) { - return finagle((ResultSetResponse) _apply(request)); - } - - @Override public ResultSetResponse apply(TypeInfoRequest request) { - return finagle((ResultSetResponse) _apply(request)); - } - - @Override public ResultSetResponse apply(ColumnsRequest request) { - return finagle((ResultSetResponse) _apply(request)); - } - - @Override public PrepareResponse apply(PrepareRequest request) { - return finagle((PrepareResponse) _apply(request)); - } - - @Override public ExecuteResponse apply(PrepareAndExecuteRequest request) { - return finagle((ExecuteResponse) _apply(request)); - } - - @Override public FetchResponse apply(FetchRequest request) { - return (FetchResponse) _apply(request); - } - - @Override public CreateStatementResponse apply(CreateStatementRequest request) { - return (CreateStatementResponse) _apply(request); - } - - @Override public CloseStatementResponse apply(CloseStatementRequest request) { - return (CloseStatementResponse) _apply(request); - } - - @Override public OpenConnectionResponse apply(OpenConnectionRequest request) { - return (OpenConnectionResponse) _apply(request); - } - - @Override public CloseConnectionResponse apply(CloseConnectionRequest request) { - return (CloseConnectionResponse) _apply(request); - } - - @Override public ConnectionSyncResponse apply(ConnectionSyncRequest request) { - return (ConnectionSyncResponse) _apply(request); - } - - @Override public DatabasePropertyResponse apply(DatabasePropertyRequest request) { - return (DatabasePropertyResponse) _apply(request); - } - - @Override public ExecuteResponse apply(ExecuteRequest request) { - return finagle((ExecuteResponse) _apply(request)); - } - - @Override public SyncResultsResponse apply(SyncResultsRequest request) { - return (SyncResultsResponse) _apply(request); - } - - @Override public CommitResponse apply(CommitRequest request) { - return (CommitResponse) _apply(request); - } - - @Override public RollbackResponse apply(RollbackRequest request) { - return (RollbackResponse) _apply(request); - } - - @Override public ExecuteBatchResponse apply(PrepareAndExecuteBatchRequest request) { - return (ExecuteBatchResponse) _apply(request); - } - - @Override public ExecuteBatchResponse apply(ExecuteBatchRequest request) { - return (ExecuteBatchResponse) _apply(request); - } - - /** - * Checks if the provided {@link Message} is an instance of the Class given by - * <code>expectedType</code>. Throws an IllegalArgumentException if the message is not of the - * expected type, otherwise, it returns the message cast as the expected type. - * - * @param msg A Protocol Buffer message. - * @param expectedType The expected type of the Protocol Buffer message. - * @return The msg cast to the concrete Message type. - * @throws IllegalArgumentException If the type of the message is not the expectedType. - */ - public static <T extends Message> T castProtobufMessage(Message msg, Class<T> expectedType) { - if (!expectedType.isInstance(msg)) { - throw new IllegalArgumentException("Expected instance of " + expectedType.getName() - + ", but got " + msg.getClass().getName()); - } - - return expectedType.cast(msg); - } -} - -// End ProtobufService.java http://git-wip-us.apache.org/repos/asf/calcite-avatica/blob/fc7b26c8/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java ---------------------------------------------------------------------- diff --git a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java b/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java deleted file mode 100644 index 7142d59..0000000 --- a/avatica/core/src/main/java/org/apache/calcite/avatica/remote/ProtobufTranslation.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.remote; - -import org.apache.calcite.avatica.remote.Service.Request; -import org.apache.calcite.avatica.remote.Service.Response; - -import java.io.IOException; - -/** - * Generic interface to support parsing of serialized protocol buffers between client and server. - */ -public interface ProtobufTranslation { - - /** - * Serializes a {@link Response} as a protocol buffer. - * - * @param response The response to serialize - * @throws IOException If there are errors during serialization - */ - byte[] serializeResponse(Response response) throws IOException; - - /** - * Serializes a {@link Request} as a protocol buffer. - * - * @param request The request to serialize - * @throws IOException If there are errors during serialization - */ - byte[] serializeRequest(Request request) throws IOException; - - /** - * Parses a serialized protocol buffer request into a {@link Request}. - * - * @param bytes Serialized protocol buffer request from client - * @return A Request object for the given bytes - * @throws IOException If the protocol buffer cannot be deserialized - */ - Request parseRequest(byte[] bytes) throws IOException; - - /** - * Parses a serialized protocol buffer response into a {@link Response}. - * - * @param bytes Serialized protocol buffer request from server - * @return The Response object for the given bytes - * @throws IOException If the protocol buffer cannot be deserialized - */ - Response parseResponse(byte[] bytes) throws IOException; -} - -// End ProtobufTranslation.java
