http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java deleted file mode 100644 index dfe7f99..0000000 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcMeta.java +++ /dev/null @@ -1,976 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.jdbc; - -import org.apache.calcite.avatica.AvaticaParameter; -import org.apache.calcite.avatica.AvaticaPreparedStatement; -import org.apache.calcite.avatica.AvaticaUtils; -import org.apache.calcite.avatica.ColumnMetaData; -import org.apache.calcite.avatica.ConnectionPropertiesImpl; -import org.apache.calcite.avatica.Meta; -import org.apache.calcite.avatica.MetaImpl; -import org.apache.calcite.avatica.MissingResultsException; -import org.apache.calcite.avatica.NoSuchConnectionException; -import org.apache.calcite.avatica.NoSuchStatementException; -import org.apache.calcite.avatica.QueryState; -import org.apache.calcite.avatica.SqlType; -import org.apache.calcite.avatica.metrics.Gauge; -import org.apache.calcite.avatica.metrics.MetricsSystem; -import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem; -import org.apache.calcite.avatica.remote.TypedValue; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.calcite.avatica.remote.MetricsHelper.concat; - -import java.lang.reflect.InvocationTargetException; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.ParameterMetaData; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Types; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -/** Implementation of {@link Meta} upon an existing JDBC data source. */ -public class JdbcMeta implements Meta { - private static final Logger LOG = LoggerFactory.getLogger(JdbcMeta.class); - - private static final String CONN_CACHE_KEY_BASE = "avatica.connectioncache"; - - private static final String STMT_CACHE_KEY_BASE = "avatica.statementcache"; - - /** Special value for {@code Statement#getLargeMaxRows()} that means fetch - * an unlimited number of rows in a single batch. - * - * <p>Any other negative value will return an unlimited number of rows but - * will do it in the default batch size, namely 100. */ - public static final long UNLIMITED_COUNT = -2L; - - // End of constants, start of member variables - - final Calendar calendar = Calendar.getInstance(); - - /** Generates ids for statements. The ids are unique across all connections - * created by this JdbcMeta. */ - private final AtomicInteger statementIdGenerator = new AtomicInteger(); - - private final String url; - private final Properties info; - private final Cache<String, Connection> connectionCache; - private final Cache<Integer, StatementInfo> statementCache; - private final MetricsSystem metrics; - - /** - * Creates a JdbcMeta. - * - * @param url a database url of the form - * <code>jdbc:<em>subprotocol</em>:<em>subname</em></code> - */ - public JdbcMeta(String url) throws SQLException { - this(url, new Properties()); - } - - /** - * Creates a JdbcMeta. - * - * @param url a database url of the form - * <code>jdbc:<em>subprotocol</em>:<em>subname</em></code> - * @param user the database user on whose behalf the connection is being - * made - * @param password the user's password - */ - public JdbcMeta(final String url, final String user, final String password) - throws SQLException { - this(url, new Properties() { - { - put("user", user); - put("password", password); - } - }); - } - - public JdbcMeta(String url, Properties info) throws SQLException { - this(url, info, NoopMetricsSystem.getInstance()); - } - - /** - * Creates a JdbcMeta. - * - * @param url a database url of the form - * <code> jdbc:<em>subprotocol</em>:<em>subname</em></code> - * @param info a list of arbitrary string tag/value pairs as - * connection arguments; normally at least a "user" and - * "password" property should be included - */ - public JdbcMeta(String url, Properties info, MetricsSystem metrics) - throws SQLException { - this.url = url; - this.info = info; - this.metrics = Objects.requireNonNull(metrics); - - int concurrencyLevel = Integer.parseInt( - info.getProperty(ConnectionCacheSettings.CONCURRENCY_LEVEL.key(), - ConnectionCacheSettings.CONCURRENCY_LEVEL.defaultValue())); - int initialCapacity = Integer.parseInt( - info.getProperty(ConnectionCacheSettings.INITIAL_CAPACITY.key(), - ConnectionCacheSettings.INITIAL_CAPACITY.defaultValue())); - long maxCapacity = Long.parseLong( - info.getProperty(ConnectionCacheSettings.MAX_CAPACITY.key(), - ConnectionCacheSettings.MAX_CAPACITY.defaultValue())); - long connectionExpiryDuration = Long.parseLong( - info.getProperty(ConnectionCacheSettings.EXPIRY_DURATION.key(), - ConnectionCacheSettings.EXPIRY_DURATION.defaultValue())); - TimeUnit connectionExpiryUnit = TimeUnit.valueOf( - info.getProperty(ConnectionCacheSettings.EXPIRY_UNIT.key(), - ConnectionCacheSettings.EXPIRY_UNIT.defaultValue())); - this.connectionCache = CacheBuilder.newBuilder() - .concurrencyLevel(concurrencyLevel) - .initialCapacity(initialCapacity) - .maximumSize(maxCapacity) - .expireAfterAccess(connectionExpiryDuration, connectionExpiryUnit) - .removalListener(new ConnectionExpiryHandler()) - .build(); - LOG.debug("instantiated connection cache: {}", connectionCache.stats()); - - concurrencyLevel = Integer.parseInt( - info.getProperty(StatementCacheSettings.CONCURRENCY_LEVEL.key(), - StatementCacheSettings.CONCURRENCY_LEVEL.defaultValue())); - initialCapacity = Integer.parseInt( - info.getProperty(StatementCacheSettings.INITIAL_CAPACITY.key(), - StatementCacheSettings.INITIAL_CAPACITY.defaultValue())); - maxCapacity = Long.parseLong( - info.getProperty(StatementCacheSettings.MAX_CAPACITY.key(), - StatementCacheSettings.MAX_CAPACITY.defaultValue())); - connectionExpiryDuration = Long.parseLong( - info.getProperty(StatementCacheSettings.EXPIRY_DURATION.key(), - StatementCacheSettings.EXPIRY_DURATION.defaultValue())); - connectionExpiryUnit = TimeUnit.valueOf( - info.getProperty(StatementCacheSettings.EXPIRY_UNIT.key(), - StatementCacheSettings.EXPIRY_UNIT.defaultValue())); - this.statementCache = CacheBuilder.newBuilder() - .concurrencyLevel(concurrencyLevel) - .initialCapacity(initialCapacity) - .maximumSize(maxCapacity) - .expireAfterAccess(connectionExpiryDuration, connectionExpiryUnit) - .removalListener(new StatementExpiryHandler()) - .build(); - - LOG.debug("instantiated statement cache: {}", statementCache.stats()); - - // Register some metrics - this.metrics.register(concat(JdbcMeta.class, "ConnectionCacheSize"), new Gauge<Long>() { - @Override public Long getValue() { - return connectionCache.size(); - } - }); - - this.metrics.register(concat(JdbcMeta.class, "StatementCacheSize"), new Gauge<Long>() { - @Override public Long getValue() { - return statementCache.size(); - } - }); - } - - /** - * Converts from JDBC metadata to Avatica columns. - */ - protected static List<ColumnMetaData> - columns(ResultSetMetaData metaData) throws SQLException { - if (metaData == null) { - return Collections.emptyList(); - } - final List<ColumnMetaData> columns = new ArrayList<>(); - for (int i = 1; i <= metaData.getColumnCount(); i++) { - final SqlType sqlType = SqlType.valueOf(metaData.getColumnType(i)); - final ColumnMetaData.Rep rep = ColumnMetaData.Rep.of(sqlType.internal); - final ColumnMetaData.AvaticaType t; - if (sqlType == SqlType.ARRAY || sqlType == SqlType.STRUCT || sqlType == SqlType.MULTISET) { - ColumnMetaData.AvaticaType arrayValueType = ColumnMetaData.scalar(Types.JAVA_OBJECT, - metaData.getColumnTypeName(i), ColumnMetaData.Rep.OBJECT); - t = ColumnMetaData.array(arrayValueType, metaData.getColumnTypeName(i), rep); - } else { - t = ColumnMetaData.scalar(metaData.getColumnType(i), metaData.getColumnTypeName(i), rep); - } - ColumnMetaData md = - new ColumnMetaData(i - 1, metaData.isAutoIncrement(i), - metaData.isCaseSensitive(i), metaData.isSearchable(i), - metaData.isCurrency(i), metaData.isNullable(i), - metaData.isSigned(i), metaData.getColumnDisplaySize(i), - metaData.getColumnLabel(i), metaData.getColumnName(i), - metaData.getSchemaName(i), metaData.getPrecision(i), - metaData.getScale(i), metaData.getTableName(i), - metaData.getCatalogName(i), t, metaData.isReadOnly(i), - metaData.isWritable(i), metaData.isDefinitelyWritable(i), - metaData.getColumnClassName(i)); - columns.add(md); - } - return columns; - } - - /** - * Converts from JDBC metadata to Avatica parameters - */ - protected static List<AvaticaParameter> parameters(ParameterMetaData metaData) - throws SQLException { - if (metaData == null) { - return Collections.emptyList(); - } - final List<AvaticaParameter> params = new ArrayList<>(); - for (int i = 1; i <= metaData.getParameterCount(); i++) { - params.add( - new AvaticaParameter(metaData.isSigned(i), metaData.getPrecision(i), - metaData.getScale(i), metaData.getParameterType(i), - metaData.getParameterTypeName(i), - metaData.getParameterClassName(i), "?" + i)); - } - return params; - } - - protected static Signature signature(ResultSetMetaData metaData, - ParameterMetaData parameterMetaData, String sql, - Meta.StatementType statementType) throws SQLException { - final CursorFactory cf = CursorFactory.LIST; // because JdbcResultSet#frame - return new Signature(columns(metaData), sql, parameters(parameterMetaData), - null, cf, statementType); - } - - protected static Signature signature(ResultSetMetaData metaData) - throws SQLException { - return signature(metaData, null, null, null); - } - - public Map<DatabaseProperty, Object> getDatabaseProperties(ConnectionHandle ch) { - try { - final Map<DatabaseProperty, Object> map = new HashMap<>(); - final DatabaseMetaData metaData = getConnection(ch.id).getMetaData(); - for (DatabaseProperty p : DatabaseProperty.values()) { - addProperty(map, metaData, p); - } - return map; - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - private static Object addProperty(Map<DatabaseProperty, Object> map, - DatabaseMetaData metaData, DatabaseProperty p) throws SQLException { - try { - return map.put(p, p.method.invoke(metaData)); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - - public MetaResultSet getTables(ConnectionHandle ch, String catalog, Pat schemaPattern, - Pat tableNamePattern, List<String> typeList) { - try { - final ResultSet rs = - getConnection(ch.id).getMetaData().getTables(catalog, schemaPattern.s, - tableNamePattern.s, toArray(typeList)); - int stmtId = registerMetaStatement(rs); - return JdbcResultSet.create(ch.id, stmtId, rs); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - /** - * Registers a StatementInfo for the given ResultSet, returning the id under - * which it is registered. This should be used for metadata ResultSets, which - * have an implicit statement created. - */ - private int registerMetaStatement(ResultSet rs) throws SQLException { - final int id = statementIdGenerator.getAndIncrement(); - StatementInfo statementInfo = new StatementInfo(rs.getStatement()); - statementInfo.setResultSet(rs); - statementCache.put(id, statementInfo); - return id; - } - - public MetaResultSet getColumns(ConnectionHandle ch, String catalog, Pat schemaPattern, - Pat tableNamePattern, Pat columnNamePattern) { - try { - final ResultSet rs = - getConnection(ch.id).getMetaData().getColumns(catalog, schemaPattern.s, - tableNamePattern.s, columnNamePattern.s); - int stmtId = registerMetaStatement(rs); - return JdbcResultSet.create(ch.id, stmtId, rs); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public MetaResultSet getSchemas(ConnectionHandle ch, String catalog, Pat schemaPattern) { - try { - final ResultSet rs = - getConnection(ch.id).getMetaData().getSchemas(catalog, schemaPattern.s); - int stmtId = registerMetaStatement(rs); - return JdbcResultSet.create(ch.id, stmtId, rs); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public MetaResultSet getCatalogs(ConnectionHandle ch) { - try { - final ResultSet rs = getConnection(ch.id).getMetaData().getCatalogs(); - int stmtId = registerMetaStatement(rs); - return JdbcResultSet.create(ch.id, stmtId, rs); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public MetaResultSet getTableTypes(ConnectionHandle ch) { - try { - final ResultSet rs = getConnection(ch.id).getMetaData().getTableTypes(); - int stmtId = registerMetaStatement(rs); - return JdbcResultSet.create(ch.id, stmtId, rs); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public MetaResultSet getProcedures(ConnectionHandle ch, String catalog, Pat schemaPattern, - Pat procedureNamePattern) { - try { - final ResultSet rs = - getConnection(ch.id).getMetaData().getProcedures(catalog, schemaPattern.s, - procedureNamePattern.s); - int stmtId = registerMetaStatement(rs); - return JdbcResultSet.create(ch.id, stmtId, rs); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public MetaResultSet getProcedureColumns(ConnectionHandle ch, String catalog, Pat schemaPattern, - Pat procedureNamePattern, Pat columnNamePattern) { - try { - final ResultSet rs = - getConnection(ch.id).getMetaData().getProcedureColumns(catalog, - schemaPattern.s, procedureNamePattern.s, columnNamePattern.s); - int stmtId = registerMetaStatement(rs); - return JdbcResultSet.create(ch.id, stmtId, rs); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public MetaResultSet getColumnPrivileges(ConnectionHandle ch, String catalog, String schema, - String table, Pat columnNamePattern) { - try { - final ResultSet rs = - getConnection(ch.id).getMetaData().getColumnPrivileges(catalog, schema, - table, columnNamePattern.s); - int stmtId = registerMetaStatement(rs); - return JdbcResultSet.create(ch.id, stmtId, rs); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public MetaResultSet getTablePrivileges(ConnectionHandle ch, String catalog, Pat schemaPattern, - Pat tableNamePattern) { - try { - final ResultSet rs = - getConnection(ch.id).getMetaData().getTablePrivileges(catalog, - schemaPattern.s, tableNamePattern.s); - int stmtId = registerMetaStatement(rs); - return JdbcResultSet.create(ch.id, stmtId, rs); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public MetaResultSet getBestRowIdentifier(ConnectionHandle ch, String catalog, String schema, - String table, int scope, boolean nullable) { - LOG.trace("getBestRowIdentifier catalog:{} schema:{} table:{} scope:{} nullable:{}", catalog, - schema, table, scope, nullable); - try { - final ResultSet rs = - getConnection(ch.id).getMetaData().getBestRowIdentifier(catalog, schema, - table, scope, nullable); - int stmtId = registerMetaStatement(rs); - return JdbcResultSet.create(ch.id, stmtId, rs); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public MetaResultSet getVersionColumns(ConnectionHandle ch, String catalog, String schema, - String table) { - LOG.trace("getVersionColumns catalog:{} schema:{} table:{}", catalog, schema, table); - try { - final ResultSet rs = - getConnection(ch.id).getMetaData().getVersionColumns(catalog, schema, table); - int stmtId = registerMetaStatement(rs); - return JdbcResultSet.create(ch.id, stmtId, rs); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public MetaResultSet getPrimaryKeys(ConnectionHandle ch, String catalog, String schema, - String table) { - LOG.trace("getPrimaryKeys catalog:{} schema:{} table:{}", catalog, schema, table); - try { - final ResultSet rs = - getConnection(ch.id).getMetaData().getPrimaryKeys(catalog, schema, table); - int stmtId = registerMetaStatement(rs); - return JdbcResultSet.create(ch.id, stmtId, rs); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public MetaResultSet getImportedKeys(ConnectionHandle ch, String catalog, String schema, - String table) { - return null; - } - - public MetaResultSet getExportedKeys(ConnectionHandle ch, String catalog, String schema, - String table) { - return null; - } - - public MetaResultSet getCrossReference(ConnectionHandle ch, String parentCatalog, - String parentSchema, String parentTable, String foreignCatalog, - String foreignSchema, String foreignTable) { - return null; - } - - public MetaResultSet getTypeInfo(ConnectionHandle ch) { - try { - final ResultSet rs = getConnection(ch.id).getMetaData().getTypeInfo(); - int stmtId = registerMetaStatement(rs); - return JdbcResultSet.create(ch.id, stmtId, rs); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public MetaResultSet getIndexInfo(ConnectionHandle ch, String catalog, String schema, - String table, boolean unique, boolean approximate) { - return null; - } - - public MetaResultSet getUDTs(ConnectionHandle ch, String catalog, Pat schemaPattern, - Pat typeNamePattern, int[] types) { - return null; - } - - public MetaResultSet getSuperTypes(ConnectionHandle ch, String catalog, Pat schemaPattern, - Pat typeNamePattern) { - return null; - } - - public MetaResultSet getSuperTables(ConnectionHandle ch, String catalog, Pat schemaPattern, - Pat tableNamePattern) { - return null; - } - - public MetaResultSet getAttributes(ConnectionHandle ch, String catalog, Pat schemaPattern, - Pat typeNamePattern, Pat attributeNamePattern) { - return null; - } - - public MetaResultSet getClientInfoProperties(ConnectionHandle ch) { - return null; - } - - public MetaResultSet getFunctions(ConnectionHandle ch, String catalog, Pat schemaPattern, - Pat functionNamePattern) { - return null; - } - - public MetaResultSet getFunctionColumns(ConnectionHandle ch, String catalog, Pat schemaPattern, - Pat functionNamePattern, Pat columnNamePattern) { - return null; - } - - public MetaResultSet getPseudoColumns(ConnectionHandle ch, String catalog, Pat schemaPattern, - Pat tableNamePattern, Pat columnNamePattern) { - return null; - } - - public Iterable<Object> createIterable(StatementHandle handle, QueryState state, - Signature signature, List<TypedValue> parameterValues, Frame firstFrame) { - return null; - } - - protected Connection getConnection(String id) throws SQLException { - if (id == null) { - throw new NullPointerException("Connection id is null."); - } - Connection conn = connectionCache.getIfPresent(id); - if (conn == null) { - throw new NoSuchConnectionException("Connection not found: invalid id, closed, or expired: " - + id); - } - return conn; - } - - public StatementHandle createStatement(ConnectionHandle ch) { - try { - final Connection conn = getConnection(ch.id); - final Statement statement = conn.createStatement(); - final int id = statementIdGenerator.getAndIncrement(); - statementCache.put(id, new StatementInfo(statement)); - StatementHandle h = new StatementHandle(ch.id, id, null); - LOG.trace("created statement {}", h); - return h; - } catch (SQLException e) { - throw propagate(e); - } - } - - @Override public void closeStatement(StatementHandle h) { - StatementInfo info = statementCache.getIfPresent(h.id); - if (info == null || info.statement == null) { - LOG.debug("client requested close unknown statement {}", h); - return; - } - LOG.trace("closing statement {}", h); - try { - ResultSet results = info.getResultSet(); - if (info.isResultSetInitialized() && null != results) { - results.close(); - } - info.statement.close(); - } catch (SQLException e) { - throw propagate(e); - } finally { - statementCache.invalidate(h.id); - } - } - - @Override public void openConnection(ConnectionHandle ch, - Map<String, String> info) { - Properties fullInfo = new Properties(); - fullInfo.putAll(this.info); - if (info != null) { - fullInfo.putAll(info); - } - - synchronized (this) { - try { - if (connectionCache.asMap().containsKey(ch.id)) { - throw new RuntimeException("Connection already exists: " + ch.id); - } - Connection conn = DriverManager.getConnection(url, fullInfo); - connectionCache.put(ch.id, conn); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - } - - @Override public void closeConnection(ConnectionHandle ch) { - Connection conn = connectionCache.getIfPresent(ch.id); - if (conn == null) { - LOG.debug("client requested close unknown connection {}", ch); - return; - } - LOG.trace("closing connection {}", ch); - try { - conn.close(); - } catch (SQLException e) { - throw propagate(e); - } finally { - connectionCache.invalidate(ch.id); - } - } - - protected void apply(Connection conn, ConnectionProperties connProps) - throws SQLException { - if (connProps.isAutoCommit() != null) { - conn.setAutoCommit(connProps.isAutoCommit()); - } - if (connProps.isReadOnly() != null) { - conn.setReadOnly(connProps.isReadOnly()); - } - if (connProps.getTransactionIsolation() != null) { - conn.setTransactionIsolation(connProps.getTransactionIsolation()); - } - if (connProps.getCatalog() != null) { - conn.setCatalog(connProps.getCatalog()); - } - if (connProps.getSchema() != null) { - conn.setSchema(connProps.getSchema()); - } - } - - @Override public ConnectionProperties connectionSync(ConnectionHandle ch, - ConnectionProperties connProps) { - LOG.trace("syncing properties for connection {}", ch); - try { - Connection conn = getConnection(ch.id); - ConnectionPropertiesImpl props = new ConnectionPropertiesImpl(conn).merge(connProps); - if (props.isDirty()) { - apply(conn, props); - props.setDirty(false); - } - return props; - } catch (SQLException e) { - throw propagate(e); - } - } - - RuntimeException propagate(Throwable e) { - if (e instanceof RuntimeException) { - throw (RuntimeException) e; - } else if (e instanceof Error) { - throw (Error) e; - } else { - throw new RuntimeException(e); - } - } - - public StatementHandle prepare(ConnectionHandle ch, String sql, - long maxRowCount) { - try { - final Connection conn = getConnection(ch.id); - final PreparedStatement statement = conn.prepareStatement(sql); - final int id = statementIdGenerator.getAndIncrement(); - Meta.StatementType statementType = null; - if (statement.isWrapperFor(AvaticaPreparedStatement.class)) { - final AvaticaPreparedStatement avaticaPreparedStatement; - avaticaPreparedStatement = - statement.unwrap(AvaticaPreparedStatement.class); - statementType = avaticaPreparedStatement.getStatementType(); - } - statementCache.put(id, new StatementInfo(statement)); - StatementHandle h = new StatementHandle(ch.id, id, - signature(statement.getMetaData(), statement.getParameterMetaData(), - sql, statementType)); - LOG.trace("prepared statement {}", h); - return h; - } catch (SQLException e) { - throw propagate(e); - } - } - - public ExecuteResult prepareAndExecute(StatementHandle h, String sql, - long maxRowCount, PrepareCallback callback) throws NoSuchStatementException { - try { - final StatementInfo info = statementCache.getIfPresent(h.id); - if (info == null) { - throw new NoSuchStatementException(h); - } - final Statement statement = info.statement; - // Special handling of maxRowCount as JDBC 0 is unlimited, our meta 0 row - if (maxRowCount > 0) { - AvaticaUtils.setLargeMaxRows(statement, maxRowCount); - } else if (maxRowCount < 0) { - statement.setMaxRows(0); - } - boolean ret = statement.execute(sql); - info.setResultSet(statement.getResultSet()); - // Either execute(sql) returned true or the resultSet was null - assert ret || null == info.getResultSet(); - final List<MetaResultSet> resultSets = new ArrayList<>(); - if (null == info.getResultSet()) { - // Create a special result set that just carries update count - resultSets.add( - JdbcResultSet.count(h.connectionId, h.id, - AvaticaUtils.getLargeUpdateCount(statement))); - } else { - resultSets.add( - JdbcResultSet.create(h.connectionId, h.id, info.getResultSet(), maxRowCount)); - } - LOG.trace("prepAndExec statement {}", h); - // TODO: review client to ensure statementId is updated when appropriate - return new ExecuteResult(resultSets); - } catch (SQLException e) { - throw propagate(e); - } - } - - public boolean syncResults(StatementHandle sh, QueryState state, long offset) - throws NoSuchStatementException { - try { - final Connection conn = getConnection(sh.connectionId); - final StatementInfo info = statementCache.getIfPresent(sh.id); - if (null == info) { - throw new NoSuchStatementException(sh); - } - final Statement statement = info.statement; - // Let the state recreate the necessary ResultSet on the Statement - info.setResultSet(state.invoke(conn, statement)); - - if (null != info.getResultSet()) { - // If it is non-null, try to advance to the requested offset. - return info.advanceResultSetToOffset(info.getResultSet(), offset); - } - - // No results, nothing to do. Client can move on. - return false; - } catch (SQLException e) { - throw propagate(e); - } - } - - public Frame fetch(StatementHandle h, long offset, int fetchMaxRowCount) throws - NoSuchStatementException, MissingResultsException { - LOG.trace("fetching {} offset:{} fetchMaxRowCount:{}", h, offset, fetchMaxRowCount); - try { - final StatementInfo statementInfo = statementCache.getIfPresent(h.id); - if (null == statementInfo) { - // Statement might have expired, or never existed on this server. - throw new NoSuchStatementException(h); - } - - if (!statementInfo.isResultSetInitialized()) { - // The Statement exists, but the results are missing. Need to call syncResults(...) - throw new MissingResultsException(h); - } - if (statementInfo.getResultSet() == null) { - return Frame.EMPTY; - } else { - return JdbcResultSet.frame(statementInfo, statementInfo.getResultSet(), offset, - fetchMaxRowCount, calendar); - } - } catch (SQLException e) { - throw propagate(e); - } - } - - private static String[] toArray(List<String> typeList) { - if (typeList == null) { - return null; - } - return typeList.toArray(new String[typeList.size()]); - } - - @Override public ExecuteResult execute(StatementHandle h, - List<TypedValue> parameterValues, long maxRowCount) throws NoSuchStatementException { - try { - if (MetaImpl.checkParameterValueHasNull(parameterValues)) { - throw new SQLException("exception while executing query: unbound parameter"); - } - - final StatementInfo statementInfo = statementCache.getIfPresent(h.id); - if (null == statementInfo) { - throw new NoSuchStatementException(h); - } - final List<MetaResultSet> resultSets; - final PreparedStatement preparedStatement = - (PreparedStatement) statementInfo.statement; - - if (parameterValues != null) { - for (int i = 0; i < parameterValues.size(); i++) { - TypedValue o = parameterValues.get(i); - preparedStatement.setObject(i + 1, o.toJdbc(calendar)); - } - } - - if (preparedStatement.execute()) { - final Meta.Frame frame; - final Signature signature2; - if (preparedStatement.isWrapperFor(AvaticaPreparedStatement.class)) { - signature2 = h.signature; - } else { - h.signature = signature(preparedStatement.getMetaData(), - preparedStatement.getParameterMetaData(), h.signature.sql, - Meta.StatementType.SELECT); - signature2 = h.signature; - } - - // Make sure we set this for subsequent fetch()'s to find the result set. - statementInfo.setResultSet(preparedStatement.getResultSet()); - - if (statementInfo.getResultSet() == null) { - frame = Frame.EMPTY; - resultSets = Collections.<MetaResultSet>singletonList( - JdbcResultSet.empty(h.connectionId, h.id, signature2)); - } else { - resultSets = Collections.<MetaResultSet>singletonList( - JdbcResultSet.create(h.connectionId, h.id, statementInfo.getResultSet(), - maxRowCount, signature2)); - } - } else { - resultSets = Collections.<MetaResultSet>singletonList( - JdbcResultSet.count(h.connectionId, h.id, preparedStatement.getUpdateCount())); - } - - return new ExecuteResult(resultSets); - } catch (SQLException e) { - throw propagate(e); - } - } - - @Override public void commit(ConnectionHandle ch) { - try { - final Connection conn = getConnection(ch.id); - conn.commit(); - } catch (SQLException e) { - throw propagate(e); - } - } - - @Override public void rollback(ConnectionHandle ch) { - try { - final Connection conn = getConnection(ch.id); - conn.rollback(); - } catch (SQLException e) { - throw propagate(e); - } - } - - /** Configurable statement cache settings. */ - public enum StatementCacheSettings { - /** JDBC connection property for setting connection cache concurrency level. */ - CONCURRENCY_LEVEL(STMT_CACHE_KEY_BASE + ".concurrency", "100"), - - /** JDBC connection property for setting connection cache initial capacity. */ - INITIAL_CAPACITY(STMT_CACHE_KEY_BASE + ".initialcapacity", "1000"), - - /** JDBC connection property for setting connection cache maximum capacity. */ - MAX_CAPACITY(STMT_CACHE_KEY_BASE + ".maxcapacity", "10000"), - - /** JDBC connection property for setting connection cache expiration duration. - * - * <p>Used in conjunction with {@link #EXPIRY_UNIT}.</p> - */ - EXPIRY_DURATION(STMT_CACHE_KEY_BASE + ".expirydiration", "5"), - - /** JDBC connection property for setting connection cache expiration unit. - * - * <p>Used in conjunction with {@link #EXPIRY_DURATION}.</p> - */ - EXPIRY_UNIT(STMT_CACHE_KEY_BASE + ".expiryunit", TimeUnit.MINUTES.name()); - - private final String key; - private final String defaultValue; - - StatementCacheSettings(String key, String defaultValue) { - this.key = key; - this.defaultValue = defaultValue; - } - - /** The configuration key for specifying this setting. */ - public String key() { - return key; - } - - /** The default value for this setting. */ - public String defaultValue() { - return defaultValue; - } - } - - /** Configurable connection cache settings. */ - public enum ConnectionCacheSettings { - /** JDBC connection property for setting connection cache concurrency level. */ - CONCURRENCY_LEVEL(CONN_CACHE_KEY_BASE + ".concurrency", "10"), - - /** JDBC connection property for setting connection cache initial capacity. */ - INITIAL_CAPACITY(CONN_CACHE_KEY_BASE + ".initialcapacity", "100"), - - /** JDBC connection property for setting connection cache maximum capacity. */ - MAX_CAPACITY(CONN_CACHE_KEY_BASE + ".maxcapacity", "1000"), - - /** JDBC connection property for setting connection cache expiration duration. */ - EXPIRY_DURATION(CONN_CACHE_KEY_BASE + ".expiryduration", "10"), - - /** JDBC connection property for setting connection cache expiration unit. */ - EXPIRY_UNIT(CONN_CACHE_KEY_BASE + ".expiryunit", TimeUnit.MINUTES.name()); - - private final String key; - private final String defaultValue; - - ConnectionCacheSettings(String key, String defaultValue) { - this.key = key; - this.defaultValue = defaultValue; - } - - /** The configuration key for specifying this setting. */ - public String key() { - return key; - } - - /** The default value for this setting. */ - public String defaultValue() { - return defaultValue; - } - } - - /** Callback for {@link #connectionCache} member expiration. */ - private class ConnectionExpiryHandler - implements RemovalListener<String, Connection> { - - public void onRemoval(RemovalNotification<String, Connection> notification) { - String connectionId = notification.getKey(); - Connection doomed = notification.getValue(); - LOG.debug("Expiring connection {} because {}", connectionId, notification.getCause()); - try { - if (doomed != null) { - doomed.close(); - } - } catch (Throwable t) { - LOG.info("Exception thrown while expiring connection {}", connectionId, t); - } - } - } - - /** Callback for {@link #statementCache} member expiration. */ - private class StatementExpiryHandler - implements RemovalListener<Integer, StatementInfo> { - public void onRemoval(RemovalNotification<Integer, StatementInfo> notification) { - Integer stmtId = notification.getKey(); - StatementInfo doomed = notification.getValue(); - if (doomed == null) { - // log/throw? - return; - } - LOG.debug("Expiring statement {} because {}", stmtId, notification.getCause()); - try { - if (doomed.getResultSet() != null) { - doomed.getResultSet().close(); - } - if (doomed.statement != null) { - doomed.statement.close(); - } - } catch (Throwable t) { - LOG.info("Exception thrown while expiring statement {}", stmtId, t); - } - } - } -} - -// End JdbcMeta.java
http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java deleted file mode 100644 index 6630124..0000000 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/JdbcResultSet.java +++ /dev/null @@ -1,215 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.jdbc; - -import org.apache.calcite.avatica.AvaticaStatement; -import org.apache.calcite.avatica.Meta; -import org.apache.calcite.avatica.util.DateTimeUtils; - -import java.sql.Array; -import java.sql.Date; -import java.sql.ResultSet; -import java.sql.ResultSetMetaData; -import java.sql.SQLException; -import java.sql.Struct; -import java.sql.Time; -import java.sql.Timestamp; -import java.sql.Types; -import java.util.ArrayList; -import java.util.Calendar; -import java.util.List; -import java.util.TreeMap; - -/** Implementation of {@link org.apache.calcite.avatica.Meta.MetaResultSet} - * upon a JDBC {@link java.sql.ResultSet}. - * - * @see org.apache.calcite.avatica.jdbc.JdbcMeta */ -class JdbcResultSet extends Meta.MetaResultSet { - protected JdbcResultSet(String connectionId, int statementId, - boolean ownStatement, Meta.Signature signature, Meta.Frame firstFrame) { - this(connectionId, statementId, ownStatement, signature, firstFrame, -1L); - } - - protected JdbcResultSet(String connectionId, int statementId, - boolean ownStatement, Meta.Signature signature, Meta.Frame firstFrame, - long updateCount) { - super(connectionId, statementId, ownStatement, signature, firstFrame, updateCount); - } - - /** Creates a result set. */ - public static JdbcResultSet create(String connectionId, int statementId, - ResultSet resultSet) { - // -1 still limits to 100 but -2 does not limit to any number - return create(connectionId, statementId, resultSet, - JdbcMeta.UNLIMITED_COUNT); - } - - /** Creates a result set with maxRowCount. - * - * <p>If {@code maxRowCount} is -2 ({@link JdbcMeta#UNLIMITED_COUNT}), - * returns an unlimited number of rows in a single frame; any other - * negative value (typically -1) returns an unlimited number of rows - * in frames of the default frame size. */ - public static JdbcResultSet create(String connectionId, int statementId, - ResultSet resultSet, long maxRowCount) { - try { - Meta.Signature sig = JdbcMeta.signature(resultSet.getMetaData()); - return create(connectionId, statementId, resultSet, maxRowCount, sig); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - public static JdbcResultSet create(String connectionId, int statementId, - ResultSet resultSet, long maxRowCount, Meta.Signature signature) { - try { - final Calendar calendar = Calendar.getInstance(DateTimeUtils.GMT_ZONE); - final int fetchRowCount; - if (maxRowCount == JdbcMeta.UNLIMITED_COUNT) { - fetchRowCount = -1; - } else if (maxRowCount < 0L) { - fetchRowCount = AvaticaStatement.DEFAULT_FETCH_SIZE; - } else if (maxRowCount > AvaticaStatement.DEFAULT_FETCH_SIZE) { - fetchRowCount = AvaticaStatement.DEFAULT_FETCH_SIZE; - } else { - fetchRowCount = (int) maxRowCount; - } - final Meta.Frame firstFrame = frame(null, resultSet, 0, fetchRowCount, calendar); - if (firstFrame.done) { - resultSet.close(); - } - return new JdbcResultSet(connectionId, statementId, true, signature, - firstFrame); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - - /** Creates a empty result set with empty frame */ - public static JdbcResultSet empty(String connectionId, int statementId, - Meta.Signature signature) { - return new JdbcResultSet(connectionId, statementId, true, signature, - Meta.Frame.EMPTY); - } - - /** Creates a result set that only has an update count. */ - public static JdbcResultSet count(String connectionId, int statementId, - int updateCount) { - return new JdbcResultSet(connectionId, statementId, true, null, null, updateCount); - } - - /** Creates a frame containing a given number or unlimited number of rows - * from a result set. */ - static Meta.Frame frame(StatementInfo info, ResultSet resultSet, long offset, - int fetchMaxRowCount, Calendar calendar) throws SQLException { - final ResultSetMetaData metaData = resultSet.getMetaData(); - final int columnCount = metaData.getColumnCount(); - final int[] types = new int[columnCount]; - for (int i = 0; i < types.length; i++) { - types[i] = metaData.getColumnType(i + 1); - } - final List<Object> rows = new ArrayList<>(); - // Meta prepare/prepareAndExecute 0 return 0 row and done - boolean done = fetchMaxRowCount == 0; - for (int i = 0; fetchMaxRowCount < 0 || i < fetchMaxRowCount; i++) { - final boolean hasRow; - if (null != info) { - hasRow = info.next(); - } else { - hasRow = resultSet.next(); - } - if (!hasRow) { - done = true; - resultSet.close(); - break; - } - Object[] columns = new Object[columnCount]; - for (int j = 0; j < columnCount; j++) { - columns[j] = getValue(resultSet, types[j], j, calendar); - } - rows.add(columns); - } - return new Meta.Frame(offset, done, rows); - } - - private static Object getValue(ResultSet resultSet, int type, int j, - Calendar calendar) throws SQLException { - switch (type) { - case Types.BIGINT: - final long aLong = resultSet.getLong(j + 1); - return aLong == 0 && resultSet.wasNull() ? null : aLong; - case Types.INTEGER: - final int anInt = resultSet.getInt(j + 1); - return anInt == 0 && resultSet.wasNull() ? null : anInt; - case Types.SMALLINT: - final short aShort = resultSet.getShort(j + 1); - return aShort == 0 && resultSet.wasNull() ? null : aShort; - case Types.TINYINT: - final byte aByte = resultSet.getByte(j + 1); - return aByte == 0 && resultSet.wasNull() ? null : aByte; - case Types.DOUBLE: - case Types.FLOAT: - final double aDouble = resultSet.getDouble(j + 1); - return aDouble == 0D && resultSet.wasNull() ? null : aDouble; - case Types.REAL: - final float aFloat = resultSet.getFloat(j + 1); - return aFloat == 0D && resultSet.wasNull() ? null : aFloat; - case Types.DATE: - final Date aDate = resultSet.getDate(j + 1, calendar); - return aDate == null - ? null - : (int) (aDate.getTime() / DateTimeUtils.MILLIS_PER_DAY); - case Types.TIME: - final Time aTime = resultSet.getTime(j + 1, calendar); - return aTime == null - ? null - : (int) (aTime.getTime() % DateTimeUtils.MILLIS_PER_DAY); - case Types.TIMESTAMP: - final Timestamp aTimestamp = resultSet.getTimestamp(j + 1, calendar); - return aTimestamp == null ? null : aTimestamp.getTime(); - case Types.ARRAY: - final Array array = resultSet.getArray(j + 1); - if (null == array) { - return null; - } - ResultSet arrayValues = array.getResultSet(); - TreeMap<Integer, Object> map = new TreeMap<>(); - while (arrayValues.next()) { - // column 1 is the index in the array, column 2 is the value. - // Recurse on `getValue` to unwrap nested types correctly. - // `j` is zero-indexed and incremented for us, thus we have `1` being used twice. - map.put(arrayValues.getInt(1), getValue(arrayValues, array.getBaseType(), 1, calendar)); - } - // If the result set is not in the same order as the actual Array, TreeMap fixes that. - // Need to make a concrete list to ensure Jackson serialization. - //return new ListLike<Object>(new ArrayList<>(map.values()), ListLikeType.ARRAY); - return new ArrayList<>(map.values()); - case Types.STRUCT: - Struct struct = resultSet.getObject(j + 1, Struct.class); - Object[] attrs = struct.getAttributes(); - List<Object> list = new ArrayList<>(attrs.length); - for (Object o : attrs) { - list.add(o); - } - return list; - default: - return resultSet.getObject(j + 1); - } - } -} - -// End JdbcResultSet.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java deleted file mode 100644 index ff27d05..0000000 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/StatementInfo.java +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.jdbc; - -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.SQLFeatureNotSupportedException; -import java.sql.Statement; -import java.util.Objects; - -/** - * All we know about a statement. Encapsulates a {@link ResultSet}. - */ -public class StatementInfo { - private volatile Boolean relativeSupported = null; - - final Statement statement; // sometimes a PreparedStatement - private ResultSet resultSet; - private long position = 0; - - // True when setResultSet(ResultSet) is called to let us determine the difference between - // a null ResultSet (from an update) from the lack of a ResultSet. - private boolean resultsInitialized = false; - - public StatementInfo(Statement statement) { - this.statement = Objects.requireNonNull(statement); - } - - // Visible for testing - void setPosition(long position) { - this.position = position; - } - - // Visible for testing - long getPosition() { - return this.position; - } - - /** - * Set a ResultSet on this object. - * - * @param resultSet The current ResultSet - */ - public void setResultSet(ResultSet resultSet) { - resultsInitialized = true; - this.resultSet = resultSet; - } - - /** - * @return The {@link ResultSet} for this Statement, may be null. - */ - public ResultSet getResultSet() { - return this.resultSet; - } - - /** - * @return True if {@link #setResultSet(ResultSet)} was ever invoked. - */ - public boolean isResultSetInitialized() { - return resultsInitialized; - } - - /** - * @see ResultSet#next() - */ - public boolean next() throws SQLException { - return _next(resultSet); - } - - boolean _next(ResultSet results) throws SQLException { - boolean ret = results.next(); - position++; - return ret; - } - - /** - * Consumes <code>offset - position</code> elements from the {@link ResultSet}. - * - * @param offset The offset to advance to - * @return True if the resultSet was advanced to the current point, false if insufficient rows - * were present to advance to the requested offset. - */ - public boolean advanceResultSetToOffset(ResultSet results, long offset) throws SQLException { - if (offset < 0 || offset < position) { - throw new IllegalArgumentException("Offset should be " - + " non-negative and not less than the current position. " + offset + ", " + position); - } - if (position >= offset) { - return true; - } - - if (null == relativeSupported) { - Boolean moreResults = null; - synchronized (this) { - if (null == relativeSupported) { - try { - moreResults = advanceByRelative(results, offset); - relativeSupported = true; - } catch (SQLFeatureNotSupportedException e) { - relativeSupported = false; - } - } - } - - if (null != moreResults) { - // We figured out whether or not relative is supported. - // Make sure we actually do the necessary work. - if (!relativeSupported) { - // We avoided calling advanceByNext in the synchronized block earlier. - moreResults = advanceByNext(results, offset); - } - - return moreResults; - } - - // Another thread updated the RELATIVE_SUPPORTED before we did, fall through. - } - - if (relativeSupported) { - return advanceByRelative(results, offset); - } else { - return advanceByNext(results, offset); - } - } - - private boolean advanceByRelative(ResultSet results, long offset) throws SQLException { - long diff = offset - position; - while (diff > Integer.MAX_VALUE) { - if (!results.relative(Integer.MAX_VALUE)) { - // Avoid updating position until relative succeeds. - position += Integer.MAX_VALUE; - return false; - } - // Avoid updating position until relative succeeds. - position += Integer.MAX_VALUE; - diff -= Integer.MAX_VALUE; - } - boolean ret = results.relative((int) diff); - // Make sure we only update the position after successfully calling relative(int). - position += diff; - return ret; - } - - private boolean advanceByNext(ResultSet results, long offset) throws SQLException { - while (position < offset) { - // Advance while maintaining `position` - if (!_next(results)) { - return false; - } - } - - return true; - } -} - -// End StatementInfo.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java b/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java deleted file mode 100644 index 8b8fb76..0000000 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/jdbc/package-info.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** Implements an Avatica provider on top of an existing JDBC data source. */ -package org.apache.calcite.avatica.jdbc; - - -// End package-info.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java deleted file mode 100644 index 42b13c9..0000000 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaHandler.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.server; - -import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; - -import org.eclipse.jetty.server.Handler; - -/** - * A custom interface that extends the Jetty interface to enable extra control within Avatica. - */ -public interface AvaticaHandler extends Handler { - - void setServerRpcMetadata(RpcMetadataResponse metadata); - -} - -// End AvaticaHandler.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java deleted file mode 100644 index 34a9333..0000000 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaJsonHandler.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.server; - -import org.apache.calcite.avatica.AvaticaUtils; -import org.apache.calcite.avatica.metrics.MetricsSystem; -import org.apache.calcite.avatica.metrics.Timer; -import org.apache.calcite.avatica.metrics.Timer.Context; -import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem; -import org.apache.calcite.avatica.remote.Handler.HandlerResponse; -import org.apache.calcite.avatica.remote.JsonHandler; -import org.apache.calcite.avatica.remote.Service; -import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; -import org.apache.calcite.avatica.util.UnsynchronizedBuffer; - -import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.handler.AbstractHandler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.calcite.avatica.remote.MetricsHelper.concat; - -import java.io.IOException; -import java.util.Objects; - -import javax.servlet.ServletException; -import javax.servlet.ServletInputStream; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -/** - * Jetty handler that executes Avatica JSON request-responses. - */ -public class AvaticaJsonHandler extends AbstractHandler implements MetricsAwareAvaticaHandler { - private static final Logger LOG = LoggerFactory.getLogger(AvaticaJsonHandler.class); - - final Service service; - final JsonHandler jsonHandler; - - final MetricsSystem metrics; - final Timer requestTimer; - - final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer; - - public AvaticaJsonHandler(Service service) { - this(service, NoopMetricsSystem.getInstance()); - } - - public AvaticaJsonHandler(Service service, MetricsSystem metrics) { - this.service = Objects.requireNonNull(service); - this.metrics = Objects.requireNonNull(metrics); - // Avatica doesn't have a Guava dependency - this.jsonHandler = new JsonHandler(service, this.metrics); - - // Metrics - this.requestTimer = this.metrics.getTimer( - concat(AvaticaJsonHandler.class, MetricsAwareAvaticaHandler.REQUEST_TIMER_NAME)); - - this.threadLocalBuffer = new ThreadLocal<UnsynchronizedBuffer>() { - @Override public UnsynchronizedBuffer initialValue() { - return new UnsynchronizedBuffer(); - } - }; - } - - public void handle(String target, Request baseRequest, - HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException { - try (final Context ctx = requestTimer.start()) { - response.setContentType("application/json;charset=utf-8"); - response.setStatus(HttpServletResponse.SC_OK); - if (request.getMethod().equals("POST")) { - // First look for a request in the header, then look in the body. - // The latter allows very large requests without hitting HTTP 413. - String rawRequest = request.getHeader("request"); - if (rawRequest == null) { - // Avoid a new buffer creation for every HTTP request - final UnsynchronizedBuffer buffer = threadLocalBuffer.get(); - try (ServletInputStream inputStream = request.getInputStream()) { - rawRequest = AvaticaUtils.readFully(inputStream, buffer); - } finally { - // Reset the offset into the buffer after we're done - buffer.reset(); - } - } - final String jsonRequest = - new String(rawRequest.getBytes("ISO-8859-1"), "UTF-8"); - LOG.trace("request: {}", jsonRequest); - - final HandlerResponse<String> jsonResponse = jsonHandler.apply(jsonRequest); - LOG.trace("response: {}", jsonResponse); - baseRequest.setHandled(true); - // Set the status code and write out the response. - response.setStatus(jsonResponse.getStatusCode()); - response.getWriter().println(jsonResponse.getResponse()); - } - } - } - - @Override public void setServerRpcMetadata(RpcMetadataResponse metadata) { - // Set the metadata for the normal service calls - service.setRpcMetadata(metadata); - // Also add it to the handler to include with exceptions - jsonHandler.setRpcMetadata(metadata); - } - - @Override public MetricsSystem getMetrics() { - return metrics; - } -} - -// End AvaticaJsonHandler.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java deleted file mode 100644 index 27e73de..0000000 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/AvaticaProtobufHandler.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.server; - -import org.apache.calcite.avatica.AvaticaUtils; -import org.apache.calcite.avatica.metrics.MetricsSystem; -import org.apache.calcite.avatica.metrics.Timer; -import org.apache.calcite.avatica.metrics.Timer.Context; -import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem; -import org.apache.calcite.avatica.remote.Handler.HandlerResponse; -import org.apache.calcite.avatica.remote.MetricsHelper; -import org.apache.calcite.avatica.remote.ProtobufHandler; -import org.apache.calcite.avatica.remote.ProtobufTranslation; -import org.apache.calcite.avatica.remote.ProtobufTranslationImpl; -import org.apache.calcite.avatica.remote.Service; -import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; -import org.apache.calcite.avatica.util.UnsynchronizedBuffer; - -import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.handler.AbstractHandler; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Objects; - -import javax.servlet.ServletException; -import javax.servlet.ServletInputStream; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -/** - * Jetty handler that executes Avatica JSON request-responses. - */ -public class AvaticaProtobufHandler extends AbstractHandler implements MetricsAwareAvaticaHandler { - private static final Logger LOG = LoggerFactory.getLogger(AvaticaJsonHandler.class); - - private final Service service; - private final ProtobufHandler pbHandler; - private final ProtobufTranslation protobufTranslation; - private final MetricsSystem metrics; - private final Timer requestTimer; - - final ThreadLocal<UnsynchronizedBuffer> threadLocalBuffer; - - public AvaticaProtobufHandler(Service service) { - this(service, NoopMetricsSystem.getInstance()); - } - - public AvaticaProtobufHandler(Service service, MetricsSystem metrics) { - this.service = Objects.requireNonNull(service); - this.metrics = Objects.requireNonNull(metrics); - - this.requestTimer = this.metrics.getTimer( - MetricsHelper.concat(AvaticaProtobufHandler.class, - MetricsAwareAvaticaHandler.REQUEST_TIMER_NAME)); - - this.protobufTranslation = new ProtobufTranslationImpl(); - this.pbHandler = new ProtobufHandler(service, protobufTranslation, metrics); - - this.threadLocalBuffer = new ThreadLocal<UnsynchronizedBuffer>() { - @Override public UnsynchronizedBuffer initialValue() { - return new UnsynchronizedBuffer(); - } - }; - } - - public void handle(String target, Request baseRequest, - HttpServletRequest request, HttpServletResponse response) - throws IOException, ServletException { - try (final Context ctx = this.requestTimer.start()) { - response.setContentType("application/octet-stream;charset=utf-8"); - response.setStatus(HttpServletResponse.SC_OK); - if (request.getMethod().equals("POST")) { - byte[] requestBytes; - // Avoid a new buffer creation for every HTTP request - final UnsynchronizedBuffer buffer = threadLocalBuffer.get(); - try (ServletInputStream inputStream = request.getInputStream()) { - requestBytes = AvaticaUtils.readFullyToBytes(inputStream, buffer); - } finally { - buffer.reset(); - } - - HandlerResponse<byte[]> handlerResponse = pbHandler.apply(requestBytes); - - baseRequest.setHandled(true); - response.setStatus(handlerResponse.getStatusCode()); - response.getOutputStream().write(handlerResponse.getResponse()); - } - } - } - - @Override public void setServerRpcMetadata(RpcMetadataResponse metadata) { - // Set the metadata for the normal service calls - service.setRpcMetadata(metadata); - // Also add it to the handler to include with exceptions - pbHandler.setRpcMetadata(metadata); - } - - @Override public MetricsSystem getMetrics() { - return this.metrics; - } - -} - -// End AvaticaProtobufHandler.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica-server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java deleted file mode 100644 index a574985..0000000 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/DelegatingAvaticaHandler.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.server; - -import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; - -import org.eclipse.jetty.server.Handler; -import org.eclipse.jetty.server.Request; -import org.eclipse.jetty.server.Server; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Objects; - -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -/** - * An AvaticaHandler implementation that delegates to a provided Jetty Handler instance. - * - * <p>This implementation provides a no-op implementation for - * {@link #setServerRpcMetadata(org.apache.calcite.avatica.remote.Service.RpcMetadataResponse)}. - * - * Does not implement {@link MetricsAwareAvaticaHandler} as this implementation is only presented - * for backwards compatibility. - */ -public class DelegatingAvaticaHandler implements AvaticaHandler { - private static final Logger LOG = LoggerFactory.getLogger(DelegatingAvaticaHandler.class); - - private final Handler handler; - - public DelegatingAvaticaHandler(Handler handler) { - this.handler = Objects.requireNonNull(handler); - } - - @Override public void handle(String target, Request baseRequest, HttpServletRequest request, - HttpServletResponse response) throws IOException, ServletException { - handler.handle(target, baseRequest, request, response); - } - - @Override public void setServer(Server server) { - handler.setServer(server); - } - - @Override public Server getServer() { - return handler.getServer(); - } - - @Override public void destroy() { - handler.destroy(); - } - - @Override public void start() throws Exception { - handler.start(); - } - - @Override public void stop() throws Exception { - handler.stop(); - } - - @Override public boolean isRunning() { - return handler.isRunning(); - } - - @Override public boolean isStarted() { - return handler.isStarted(); - } - - @Override public boolean isStarting() { - return handler.isStarting(); - } - - @Override public boolean isStopping() { - return handler.isStopping(); - } - - @Override public boolean isStopped() { - return handler.isStopped(); - } - - @Override public boolean isFailed() { - return handler.isFailed(); - } - - @Override public void addLifeCycleListener(Listener listener) { - handler.addLifeCycleListener(listener); - } - - @Override public void removeLifeCycleListener(Listener listener) { - handler.removeLifeCycleListener(listener); - } - - @Override public void setServerRpcMetadata(RpcMetadataResponse metadata) { - LOG.warn("Setting RpcMetadata is not implemented for DelegatingAvaticaHandler"); - } - -} - -// End DelegatingAvaticaHandler.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica-server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java deleted file mode 100644 index b1fcb40..0000000 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/HandlerFactory.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.server; - -import org.apache.calcite.avatica.metrics.MetricsSystem; -import org.apache.calcite.avatica.metrics.MetricsSystemConfiguration; -import org.apache.calcite.avatica.metrics.MetricsSystemFactory; -import org.apache.calcite.avatica.metrics.MetricsSystemLoader; -import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystem; -import org.apache.calcite.avatica.metrics.noop.NoopMetricsSystemConfiguration; -import org.apache.calcite.avatica.remote.Driver; -import org.apache.calcite.avatica.remote.Service; - -import org.eclipse.jetty.server.Handler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.ServiceLoader; - -/** - * Factory that instantiates the desired implementation, typically differing on the method - * used to serialize messages, for use in the Avatica server. - */ -public class HandlerFactory { - private static final Logger LOG = LoggerFactory.getLogger(HandlerFactory.class); - - /** - * Constructs the desired implementation for the given serialization method with metrics. - * - * @param service The underlying {@link Service}. - * @param serialization The desired message serialization. - * @return The {@link Handler}. - */ - public Handler getHandler(Service service, Driver.Serialization serialization) { - return getHandler(service, serialization, NoopMetricsSystemConfiguration.getInstance()); - } - - /** - * Constructs the desired implementation for the given serialization method with metrics. - * - * @param service The underlying {@link Service}. - * @param serialization The desired message serialization. - * @param metricsConfig Configuration for the {@link MetricsSystem}. - * @return The {@link Handler}. - */ - public Handler getHandler(Service service, Driver.Serialization serialization, - MetricsSystemConfiguration<?> metricsConfig) { - MetricsSystem metrics = MetricsSystemLoader.load(Objects.requireNonNull(metricsConfig)); - - switch (serialization) { - case JSON: - return new AvaticaJsonHandler(service, metrics); - case PROTOBUF: - return new AvaticaProtobufHandler(service, metrics); - default: - throw new IllegalArgumentException("Unknown Avatica handler for " + serialization.name()); - } - } - - /** - * Load a {@link MetricsSystem} using ServiceLoader to create a {@link MetricsSystemFactory}. - * - * @param config State to pass to the factory for initialization. - * @return A {@link MetricsSystem} instance. - */ - MetricsSystem loadMetricsSystem(MetricsSystemConfiguration<?> config) { - ServiceLoader<MetricsSystemFactory> loader = ServiceLoader.load(MetricsSystemFactory.class); - List<MetricsSystemFactory> availableFactories = new ArrayList<>(); - for (MetricsSystemFactory factory : loader) { - availableFactories.add(factory); - } - - if (1 == availableFactories.size()) { - // One and only one instance -- what we want - MetricsSystemFactory factory = availableFactories.get(0); - LOG.info("Loaded MetricsSystem {}", factory.getClass()); - return factory.create(config); - } else if (availableFactories.isEmpty()) { - // None-provided default to no metrics - LOG.info("No metrics implementation available on classpath. Using No-op implementation"); - return NoopMetricsSystem.getInstance(); - } else { - // Tell the user they're doing something wrong, and choose the first impl. - StringBuilder sb = new StringBuilder(); - for (MetricsSystemFactory factory : availableFactories) { - if (sb.length() > 0) { - sb.append(", "); - } - sb.append(factory.getClass()); - } - LOG.warn("Found multiple MetricsSystemFactory implementations: {}." - + " Using No-op implementation", sb); - return NoopMetricsSystem.getInstance(); - } - } -} - -// End HandlerFactory.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica-server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java deleted file mode 100644 index c81e899..0000000 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/HttpServer.java +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.server; - -import org.apache.calcite.avatica.remote.Service.RpcMetadataResponse; - -import org.eclipse.jetty.server.Connector; -import org.eclipse.jetty.server.Handler; -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.server.handler.DefaultHandler; -import org.eclipse.jetty.server.handler.HandlerList; -import org.eclipse.jetty.util.thread.QueuedThreadPool; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.InetAddress; -import java.net.UnknownHostException; - -/** - * Avatica HTTP server. - * - * <p>If you need to change the server's configuration, override the - * {@link #configureConnector(ServerConnector, int)} method in a derived class. - */ -public class HttpServer { - private static final Logger LOG = LoggerFactory.getLogger(HttpServer.class); - - private Server server; - private int port = -1; - private final AvaticaHandler handler; - - @Deprecated - public HttpServer(Handler handler) { - this(wrapJettyHandler(handler)); - } - - public HttpServer(AvaticaHandler handler) { - this(0, handler); - } - - @Deprecated - public HttpServer(int port, Handler handler) { - this(port, wrapJettyHandler(handler)); - } - - public HttpServer(int port, AvaticaHandler handler) { - this.port = port; - this.handler = handler; - } - - private static AvaticaHandler wrapJettyHandler(Handler handler) { - if (handler instanceof AvaticaHandler) { - return (AvaticaHandler) handler; - } - // Backwards compatibility, noop's the AvaticaHandler interface - return new DelegatingAvaticaHandler(handler); - } - - public void start() { - if (server != null) { - throw new RuntimeException("Server is already started"); - } - - final QueuedThreadPool threadPool = new QueuedThreadPool(); - threadPool.setDaemon(true); - server = new Server(threadPool); - server.manage(threadPool); - - final ServerConnector connector = configureConnector(new ServerConnector(server), port); - - server.setConnectors(new Connector[] { connector }); - - final HandlerList handlerList = new HandlerList(); - handlerList.setHandlers(new Handler[] { handler, new DefaultHandler() }); - server.setHandler(handlerList); - try { - server.start(); - } catch (Exception e) { - throw new RuntimeException(e); - } - port = connector.getLocalPort(); - - LOG.info("Service listening on port {}.", getPort()); - - // Set the information about the address for this server - try { - this.handler.setServerRpcMetadata(createRpcServerMetadata(connector)); - } catch (UnknownHostException e) { - // Failed to do the DNS lookup, bail out. - throw new RuntimeException(e); - } - } - - private RpcMetadataResponse createRpcServerMetadata(ServerConnector connector) throws - UnknownHostException { - String host = connector.getHost(); - if (null == host) { - // "null" means binding to all interfaces, we need to pick one so the client gets a real - // address and not "0.0.0.0" or similar. - host = InetAddress.getLocalHost().getHostName(); - } - - final int port = connector.getLocalPort(); - - return new RpcMetadataResponse(String.format("%s:%d", host, port)); - } - - /** - * Configures the server connector. - * - * <p>The default configuration sets a timeout of 1 minute and disables - * TCP linger time. - * - * <p>To change the configuration, override this method in a derived class. - * The overriding method must call its super method. - * - * @param connector connector to be configured - * @param port port number handed over in constructor - */ - protected ServerConnector configureConnector(ServerConnector connector, int port) { - connector.setIdleTimeout(60 * 1000); - connector.setSoLingerTime(-1); - connector.setPort(port); - return connector; - } - - public void stop() { - if (server == null) { - throw new RuntimeException("Server is already stopped"); - } - - LOG.info("Service terminating."); - try { - final Server server1 = server; - port = -1; - server = null; - server1.stop(); - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public void join() throws InterruptedException { - server.join(); - } - - public int getPort() { - return port; - } -} - -// End HttpServer.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java deleted file mode 100644 index 8b05931..0000000 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/Main.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.server; - -import org.apache.calcite.avatica.Meta; -import org.apache.calcite.avatica.remote.LocalService; -import org.apache.calcite.avatica.remote.Service; - -import org.eclipse.jetty.server.handler.AbstractHandler; - -import java.util.Arrays; - -/** - * Jetty handler that executes Avatica JSON request-responses. - */ -public class Main { - private Main() {} - - public static void main(String[] args) - throws InterruptedException, ClassNotFoundException, - IllegalAccessException, InstantiationException { - HttpServer server = start(args); - server.join(); - } - - /** - * Factory that instantiates Jetty Handlers - */ - public interface HandlerFactory { - AbstractHandler createHandler(Service service); - } - - private static final HandlerFactory JSON_HANDLER_FACTORY = new HandlerFactory() { - public AbstractHandler createHandler(Service service) { - return new AvaticaJsonHandler(service); - } - }; - - /** - * Creates and starts an {@link HttpServer} using JSON POJO serialization of requests/responses. - * - * <p>Arguments are as follows: - * <ul> - * <li>args[0]: the {@link org.apache.calcite.avatica.Meta.Factory} class - * name - * <li>args[1+]: arguments passed along to - * {@link org.apache.calcite.avatica.Meta.Factory#create(java.util.List)} - * </ul> - * - * @param args Command-line arguments - */ - public static HttpServer start(String[] args) throws ClassNotFoundException, - InstantiationException, IllegalAccessException { - return start(args, 8765, JSON_HANDLER_FACTORY); - } - - /** - * Creates and starts an {@link HttpServer} using the given factory to create the Handler. - * - * <p>Arguments are as follows: - * <ul> - * <li>args[0]: the {@link org.apache.calcite.avatica.Meta.Factory} class - * name - * <li>args[1+]: arguments passed along to - * {@link org.apache.calcite.avatica.Meta.Factory#create(java.util.List)} - * </ul> - * - * @param args Command-line arguments - * @param port Server port to bind - * @param handlerFactory Factory to create the handler used by the server - */ - public static HttpServer start(String[] args, int port, HandlerFactory handlerFactory) - throws ClassNotFoundException, InstantiationException, - IllegalAccessException { - String factoryClassName = args[0]; - Class<?> factoryClass = Class.forName(factoryClassName); - Meta.Factory factory = (Meta.Factory) factoryClass.newInstance(); - Meta meta = factory.create(Arrays.asList(args).subList(1, args.length)); - Service service = new LocalService(meta); - HttpServer server = new HttpServer(port, handlerFactory.createHandler(service)); - server.start(); - return server; - } -} - -// End Main.java http://git-wip-us.apache.org/repos/asf/calcite/blob/5cee486f/avatica-server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java ---------------------------------------------------------------------- diff --git a/avatica-server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java b/avatica-server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java deleted file mode 100644 index 0914dbd..0000000 --- a/avatica-server/src/main/java/org/apache/calcite/avatica/server/MetricsAwareAvaticaHandler.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to you under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.calcite.avatica.server; - -import org.apache.calcite.avatica.metrics.MetricsSystem; - -/** - * An {@link AvaticaHandler} that is capable of collecting metrics. - */ -public interface MetricsAwareAvaticaHandler extends AvaticaHandler { - - /** - * General prefix for all metrics in a handler. - */ - String HANDLER_PREFIX = "Handler."; - - /** - * Name for timing requests from users - */ - String REQUEST_TIMER_NAME = HANDLER_PREFIX + "RequestTimings"; - - /** - * @return An instance of the {@link MetricsSystem} for this AvaticaHandler. - */ - MetricsSystem getMetrics(); - -} - -// End MetricsAwareAvaticaHandler.java
