IGNITE-6327: Added thin client configuration. This closes #2672.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9fd82212 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9fd82212 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9fd82212 Branch: refs/heads/ignite-6149 Commit: 9fd82212e528a6287cf89d279416aefa92c0cad5 Parents: b861532 Author: devozerov <ppoze...@gmail.com> Authored: Sat Sep 16 14:57:55 2017 +0300 Committer: devozerov <ppoze...@gmail.com> Committed: Sat Sep 16 14:57:55 2017 +0300 ---------------------------------------------------------------------- .../ClientConnectorConfiguration.java | 275 +++++++++++++++++ .../configuration/IgniteConfiguration.java | 37 ++- .../ignite/configuration/OdbcConfiguration.java | 4 +- .../SqlConnectorConfiguration.java | 7 +- .../ignite/internal/GridKernalContext.java | 5 +- .../ignite/internal/GridKernalContextImpl.java | 11 +- .../apache/ignite/internal/IgniteKernal.java | 4 +- .../internal/jdbc/thin/JdbcThinStatement.java | 4 +- .../internal/jdbc/thin/JdbcThinTcpIo.java | 28 +- .../internal/jdbc/thin/JdbcThinUtils.java | 4 +- .../wal/reader/StandaloneGridKernalContext.java | 4 +- .../odbc/ClientListenerBufferedParser.java | 81 +++++ .../odbc/ClientListenerConnectionContext.java | 62 ++++ .../odbc/ClientListenerMessageParser.java | 39 +++ .../odbc/ClientListenerNioListener.java | 245 +++++++++++++++ .../odbc/ClientListenerNioServerBuffer.java | 114 +++++++ .../odbc/ClientListenerProcessor.java | 308 +++++++++++++++++++ .../odbc/ClientListenerProtocolVersion.java | 116 +++++++ .../processors/odbc/ClientListenerRequest.java | 31 ++ .../odbc/ClientListenerRequestHandler.java | 48 +++ .../odbc/ClientListenerRequestNoId.java | 28 ++ .../processors/odbc/ClientListenerResponse.java | 76 +++++ .../odbc/SqlListenerBufferedParser.java | 81 ----- .../odbc/SqlListenerConnectionContext.java | 62 ---- .../odbc/SqlListenerMessageParser.java | 39 --- .../processors/odbc/SqlListenerNioListener.java | 248 --------------- .../odbc/SqlListenerNioServerBuffer.java | 114 ------- .../processors/odbc/SqlListenerProcessor.java | 278 ----------------- .../odbc/SqlListenerProtocolVersion.java | 116 ------- .../processors/odbc/SqlListenerRequest.java | 31 -- .../odbc/SqlListenerRequestHandler.java | 48 --- .../processors/odbc/SqlListenerRequestNoId.java | 28 -- .../processors/odbc/SqlListenerResponse.java | 76 ----- .../odbc/jdbc/JdbcConnectionContext.java | 28 +- .../processors/odbc/jdbc/JdbcMessageParser.java | 12 +- .../processors/odbc/jdbc/JdbcRequest.java | 4 +- .../odbc/jdbc/JdbcRequestHandler.java | 64 ++-- .../processors/odbc/jdbc/JdbcResponse.java | 4 +- .../odbc/odbc/OdbcConnectionContext.java | 28 +- .../processors/odbc/odbc/OdbcMessageParser.java | 16 +- .../processors/odbc/odbc/OdbcRequest.java | 4 +- .../odbc/odbc/OdbcRequestHandler.java | 46 +-- .../processors/odbc/odbc/OdbcResponse.java | 4 +- .../client/ClientConnectionContext.java | 22 +- .../platform/client/ClientMessageParser.java | 12 +- .../platform/client/ClientRequest.java | 4 +- .../platform/client/ClientRequestHandler.java | 12 +- .../platform/client/ClientResponse.java | 4 +- .../utils/PlatformConfigurationUtils.java | 4 + .../distributed/IgniteCache150ClientsTest.java | 4 +- .../OdbcConfigurationValidationSelfTest.java | 1 - ...onnectorConfigurationValidationSelfTest.java | 241 +++++++++++++++ ...onnectorConfigurationValidationSelfTest.java | 3 +- .../IgniteCacheQuerySelfTestSuite.java | 2 + .../odbc-test/config/queries-test-default.xml | 4 +- .../odbc-test/config/queries-test-noodbc-32.xml | 2 +- .../odbc-test/config/queries-test-noodbc.xml | 2 +- 57 files changed, 1881 insertions(+), 1298 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java new file mode 100644 index 0000000..1c33a00 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/ClientConnectorConfiguration.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +/** + * Client connector configuration. + */ +public class ClientConnectorConfiguration { + /** Default port. */ + public static final int DFLT_PORT = 10800; + + /** Default port range. */ + public static final int DFLT_PORT_RANGE = 100; + + /** Default socket send and receive buffer size. */ + public static final int DFLT_SOCK_BUF_SIZE = 0; + + /** Default value for {@code TCP_NODELAY} socket option (value is <tt>true</tt>). */ + public static final boolean DFLT_TCP_NO_DELAY = true; + + /** Default max number of open cursors per connection. */ + public static final int DFLT_MAX_OPEN_CURSORS_PER_CONN = 128; + + /** Default size of thread pool. */ + public static final int DFLT_THREAD_POOL_SIZE = IgniteConfiguration.DFLT_PUBLIC_THREAD_CNT; + + /** Host. */ + private String host; + + /** Port. */ + private int port = DFLT_PORT; + + /** Port range. */ + private int portRange = DFLT_PORT_RANGE; + + /** Socket send buffer size. */ + private int sockSndBufSize = DFLT_SOCK_BUF_SIZE; + + /** Socket receive buffer size. */ + private int sockRcvBufSize = DFLT_SOCK_BUF_SIZE; + + /** TCP no delay. */ + private boolean tcpNoDelay = DFLT_TCP_NO_DELAY; + + /** Max number of opened cursors per connection. */ + private int maxOpenCursorsPerConn = DFLT_MAX_OPEN_CURSORS_PER_CONN; + + /** Thread pool size. */ + private int threadPoolSize = DFLT_THREAD_POOL_SIZE; + + /** + * Creates SQL connector configuration with all default values. + */ + public ClientConnectorConfiguration() { + // No-op. + } + + /** + * Creates SQL connector configuration by copying all properties from given configuration. + * + * @param cfg Configuration to copy. + */ + public ClientConnectorConfiguration(ClientConnectorConfiguration cfg) { + assert cfg != null; + + host = cfg.getHost(); + maxOpenCursorsPerConn = cfg.getMaxOpenCursorsPerConnection(); + port = cfg.getPort(); + portRange = cfg.getPortRange(); + sockRcvBufSize = cfg.getSocketReceiveBufferSize(); + sockSndBufSize = cfg.getSocketSendBufferSize(); + tcpNoDelay = cfg.isTcpNoDelay(); + threadPoolSize = cfg.getThreadPoolSize(); + } + + /** + * Get host. + * + * @return Host. + */ + @Nullable public String getHost() { + return host; + } + + /** + * Set host. + * + * @param host Host. + * @return This instance for chaining. + */ + public ClientConnectorConfiguration setHost(@Nullable String host) { + this.host = host; + + return this; + } + + /** + * Get port. + * + * @return Port. + */ + public int getPort() { + return port; + } + + /** + * Set port. + * + * @param port Port. + * @return This instance for chaining. + */ + public ClientConnectorConfiguration setPort(int port) { + this.port = port; + + return this; + } + + /** + * Get port range. + * + * @return Port range. + */ + public int getPortRange() { + return portRange; + } + + /** + * Set port range. + * + * @param portRange Port range. + * @return This instance for chaining. + */ + public ClientConnectorConfiguration setPortRange(int portRange) { + this.portRange = portRange; + + return this; + } + + /** + * Gets socket send buffer size. When set to zero, operation system default will be used. + * <p> + * Defaults to {@link #DFLT_SOCK_BUF_SIZE} + * + * @return Socket send buffer size in bytes. + */ + public int getSocketSendBufferSize() { + return sockSndBufSize; + } + + /** + * Sets socket send buffer size. See {@link #getSocketSendBufferSize()} for more information. + * + * @param sockSndBufSize Socket send buffer size in bytes. + * @return This instance for chaining. + */ + public ClientConnectorConfiguration setSocketSendBufferSize(int sockSndBufSize) { + this.sockSndBufSize = sockSndBufSize; + + return this; + } + + /** + * Gets socket receive buffer size. When set to zero, operation system default will be used. + * <p> + * Defaults to {@link #DFLT_SOCK_BUF_SIZE}. + * + * @return Socket receive buffer size in bytes. + */ + public int getSocketReceiveBufferSize() { + return sockRcvBufSize; + } + + /** + * Sets socket receive buffer size. See {@link #getSocketReceiveBufferSize()} for more information. + * + * @param sockRcvBufSize Socket receive buffer size in bytes. + * @return This instance for chaining. + */ + public ClientConnectorConfiguration setSocketReceiveBufferSize(int sockRcvBufSize) { + this.sockRcvBufSize = sockRcvBufSize; + + return this; + } + + /** + * Get TCP NO_DELAY flag. + * + * @return TCP NO_DELAY flag. + */ + public boolean isTcpNoDelay() { + return tcpNoDelay; + } + + /** + * Set TCP NO_DELAY flag. + * + * @param tcpNoDelay TCP NO_DELAY flag. + * @return This instance for chaining. + */ + public ClientConnectorConfiguration setTcpNoDelay(boolean tcpNoDelay) { + this.tcpNoDelay = tcpNoDelay; + + return this; + } + + /** + * Gets maximum number of opened cursors per connection. + * <p> + * Defaults to {@link #DFLT_MAX_OPEN_CURSORS_PER_CONN}. + * + * @return Maximum number of opened cursors. + */ + public int getMaxOpenCursorsPerConnection() { + return maxOpenCursorsPerConn; + } + + /** + * Sets maximum number of opened cursors per connection. + * + * @param maxOpenCursorsPerConn Maximum number of opened cursors. + * @return This instance for chaining. + */ + public ClientConnectorConfiguration setMaxOpenCursorsPerConnection(int maxOpenCursorsPerConn) { + this.maxOpenCursorsPerConn = maxOpenCursorsPerConn; + + return this; + } + + /** + * Size of thread pool that is in charge of processing SQL requests. + * <p> + * Defaults {@link #DFLT_THREAD_POOL_SIZE}. + * + * @return Thread pool that is in charge of processing SQL requests. + */ + public int getThreadPoolSize() { + return threadPoolSize; + } + + /** + * Sets thread pool that is in charge of processing SQL requests. See {@link #getThreadPoolSize()} for more + * information. + * + * @param threadPoolSize Thread pool that is in charge of processing SQL requests. + * @return This instance for chaining. + */ + public ClientConnectorConfiguration setThreadPoolSize(int threadPoolSize) { + this.threadPoolSize = threadPoolSize; + + return this; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClientConnectorConfiguration.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java index 1cc99da..b445205 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java @@ -74,6 +74,7 @@ import org.apache.ignite.spi.indexing.IndexingSpi; import org.apache.ignite.spi.loadbalancing.LoadBalancingSpi; import org.apache.ignite.spi.loadbalancing.roundrobin.RoundRobinLoadBalancingSpi; import org.apache.ignite.ssl.SslContextFactory; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.plugin.segmentation.SegmentationPolicy.STOP; @@ -467,7 +468,11 @@ public class IgniteConfiguration { private long longQryWarnTimeout = DFLT_LONG_QRY_WARN_TIMEOUT; /** SQL connector configuration. */ - private SqlConnectorConfiguration sqlConnCfg = new SqlConnectorConfiguration(); + @Deprecated + private SqlConnectorConfiguration sqlConnCfg; + + /** Client connector configuration. */ + private ClientConnectorConfiguration cliConnCfg; /** * Creates valid grid configuration with all default values. @@ -513,6 +518,7 @@ public class IgniteConfiguration { classLdr = cfg.getClassLoader(); clientFailureDetectionTimeout = cfg.getClientFailureDetectionTimeout(); clientMode = cfg.isClientMode(); + cliConnCfg = cfg.getClientConnectorConfiguration(); connectorCfg = cfg.getConnectorConfiguration(); consistentId = cfg.getConsistentId(); daemon = cfg.isDaemon(); @@ -2483,7 +2489,7 @@ public class IgniteConfiguration { * Gets configuration for ODBC. * * @return ODBC configuration. - * @deprecated Use {@link #getSqlConnectorConfiguration()} instead. + * @deprecated Use {@link #getClientConnectorConfiguration()} ()} instead. */ @Deprecated public OdbcConfiguration getOdbcConfiguration() { @@ -2495,7 +2501,7 @@ public class IgniteConfiguration { * * @param odbcCfg ODBC configuration. * @return {@code this} for chaining. - * @deprecated Use {@link #setSqlConnectorConfiguration(SqlConnectorConfiguration)} instead. + * @deprecated Use {@link #setClientConnectorConfiguration(ClientConnectorConfiguration)} instead. */ @Deprecated public IgniteConfiguration setOdbcConfiguration(OdbcConfiguration odbcCfg) { @@ -2796,7 +2802,9 @@ public class IgniteConfiguration { * * @param sqlConnCfg SQL connector configuration. * @return {@code this} for chaining. + * @deprecated Use {@link #setClientConnectorConfiguration(ClientConnectorConfiguration)} instead. */ + @Deprecated public IgniteConfiguration setSqlConnectorConfiguration(SqlConnectorConfiguration sqlConnCfg) { this.sqlConnCfg = sqlConnCfg; @@ -2807,11 +2815,34 @@ public class IgniteConfiguration { * Gets SQL connector configuration. * * @return SQL connector configuration. + * @deprecated Use {@link #getClientConnectorConfiguration()} instead. */ + @Deprecated public SqlConnectorConfiguration getSqlConnectorConfiguration() { return sqlConnCfg; } + /** + * Sets client connector configuration. + * + * @param cliConnCfg Client connector configuration. + * @return {@code this} for chaining. + */ + public IgniteConfiguration setClientConnectorConfiguration(@Nullable ClientConnectorConfiguration cliConnCfg) { + this.cliConnCfg = cliConnCfg; + + return this; + } + + /** + * Gets client connector configuration. + * + * @return Client connector configuration. + */ + @Nullable public ClientConnectorConfiguration getClientConnectorConfiguration() { + return cliConnCfg; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(IgniteConfiguration.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java index 6d7ab38..4bc3a7f 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/OdbcConfiguration.java @@ -22,8 +22,8 @@ import org.apache.ignite.internal.util.typedef.internal.S; /** * ODBC configuration. * <p> - * Deprecated as of Apache Ignite 2.1. Please use {@link SqlConnectorConfiguration} and - * {@link IgniteConfiguration#setSqlConnectorConfiguration(SqlConnectorConfiguration)} instead. + * Deprecated as of Apache Ignite 2.1. Please use {@link ClientConnectorConfiguration} and + * {@link IgniteConfiguration#setClientConnectorConfiguration(ClientConnectorConfiguration)} instead. */ @Deprecated public class OdbcConfiguration { http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/configuration/SqlConnectorConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/SqlConnectorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/SqlConnectorConfiguration.java index 9921974..4233d44 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/SqlConnectorConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/SqlConnectorConfiguration.java @@ -21,8 +21,13 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; /** - * SQL connector configuration. + * SQL connection configuration. + * <p> + * Deprecated as of Apache Ignite 2.3. Please use {@link ClientConnectorConfiguration} and + * {@link IgniteConfiguration#setClientConnectorConfiguration(ClientConnectorConfiguration)} instead. */ +@SuppressWarnings("deprecation") +@Deprecated public class SqlConnectorConfiguration { /** Default port. */ public static final int DFLT_PORT = 10800; http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index 48a1e3e..99c7cce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -33,7 +33,6 @@ import org.apache.ignite.internal.managers.failover.GridFailoverManager; import org.apache.ignite.internal.managers.indexing.GridIndexingManager; import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; -import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; @@ -49,7 +48,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter; import org.apache.ignite.internal.processors.job.GridJobProcessor; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor; -import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor; +import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.internal.processors.pool.PoolProcessor; @@ -335,7 +334,7 @@ public interface GridKernalContext extends Iterable<GridComponent> { * * @return SQL listener processor. */ - public SqlListenerProcessor sqlListener(); + public ClientListenerProcessor sqlListener(); /** * @return Plugin processor. http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 89ead1a..07e5970 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -47,7 +47,6 @@ import org.apache.ignite.internal.managers.indexing.GridIndexingManager; import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager; -import org.apache.ignite.internal.processors.cache.CacheObjectValueContext; import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; @@ -65,7 +64,7 @@ import org.apache.ignite.internal.processors.job.GridJobProcessor; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor; import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor; -import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor; +import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.platform.plugin.PlatformPluginProcessor; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; @@ -161,7 +160,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringInclude - private SqlListenerProcessor sqlListenerProc; + private ClientListenerProcessor sqlListenerProc; /** */ @GridToStringInclude @@ -568,8 +567,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable pluginProc = (IgnitePluginProcessor)comp; else if (comp instanceof GridQueryProcessor) qryProc = (GridQueryProcessor)comp; - else if (comp instanceof SqlListenerProcessor) - sqlListenerProc = (SqlListenerProcessor)comp; + else if (comp instanceof ClientListenerProcessor) + sqlListenerProc = (ClientListenerProcessor)comp; else if (comp instanceof DataStructuresProcessor) dataStructuresProc = (DataStructuresProcessor)comp; else if (comp instanceof ClusterProcessor) @@ -825,7 +824,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ - @Override public SqlListenerProcessor sqlListener() { + @Override public ClientListenerProcessor sqlListener() { return sqlListenerProc; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 3922b39..67f76e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -129,7 +129,7 @@ import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor; import org.apache.ignite.internal.processors.nodevalidation.DiscoveryNodeValidationProcessor; import org.apache.ignite.internal.processors.nodevalidation.OsDiscoveryNodeValidationProcessor; -import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor; +import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; import org.apache.ignite.internal.processors.platform.PlatformNoopProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.platform.plugin.PlatformPluginProcessor; @@ -944,7 +944,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { startProcessor(new GridClusterStateProcessor(ctx)); startProcessor(new GridCacheProcessor(ctx)); startProcessor(new GridQueryProcessor(ctx)); - startProcessor(new SqlListenerProcessor(ctx)); + startProcessor(new ClientListenerProcessor(ctx)); startProcessor(new GridServiceProcessor(ctx)); startProcessor(new GridTaskSessionProcessor(ctx)); startProcessor(new GridJobProcessor(ctx)); http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java index 44477cd..d9bef54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinStatement.java @@ -29,7 +29,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.query.SqlQuery; -import org.apache.ignite.internal.processors.odbc.SqlListenerResponse; +import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQuery; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcQueryExecuteResult; @@ -380,7 +380,7 @@ public class JdbcThinStatement implements Statement { try { JdbcBatchExecuteResult res = conn.io().batchExecute(conn.getSchema(), batch); - if (res.errorCode() != SqlListenerResponse.STATUS_SUCCESS) + if (res.errorCode() != ClientListenerResponse.STATUS_SUCCESS) throw new BatchUpdateException(res.errorMessage(), null, res.errorCode(), res.updateCounts()); return res.updateCounts(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java index 9775254..27b9407 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinTcpIo.java @@ -28,10 +28,10 @@ import org.apache.ignite.internal.binary.BinaryReaderExImpl; import org.apache.ignite.internal.binary.BinaryWriterExImpl; import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; -import org.apache.ignite.internal.processors.odbc.SqlListenerNioListener; -import org.apache.ignite.internal.processors.odbc.SqlListenerProtocolVersion; -import org.apache.ignite.internal.processors.odbc.SqlListenerRequest; -import org.apache.ignite.internal.processors.odbc.SqlListenerResponse; +import org.apache.ignite.internal.processors.odbc.ClientListenerNioListener; +import org.apache.ignite.internal.processors.odbc.ClientListenerProtocolVersion; +import org.apache.ignite.internal.processors.odbc.ClientListenerRequest; +import org.apache.ignite.internal.processors.odbc.ClientListenerResponse; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteRequest; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcBatchExecuteResult; import org.apache.ignite.internal.processors.odbc.jdbc.JdbcMetaColumnsRequest; @@ -67,10 +67,10 @@ import org.apache.ignite.lang.IgniteProductVersion; */ public class JdbcThinTcpIo { /** Current version. */ - private static final SqlListenerProtocolVersion CURRENT_VER = SqlListenerProtocolVersion.create(2, 1, 5); + private static final ClientListenerProtocolVersion CURRENT_VER = ClientListenerProtocolVersion.create(2, 1, 5); /** Version 2.1.0. */ - private static final SqlListenerProtocolVersion VER_2_1_0 = SqlListenerProtocolVersion.create(2, 1, 0); + private static final ClientListenerProtocolVersion VER_2_1_0 = ClientListenerProtocolVersion.create(2, 1, 0); /** Initial output stream capacity for handshake. */ private static final int HANDSHAKE_MSG_SIZE = 13; @@ -139,7 +139,7 @@ public class JdbcThinTcpIo { private IgniteProductVersion igniteVer; /** Ignite server protocol version. */ - private SqlListenerProtocolVersion srvProtocolVer; + private ClientListenerProtocolVersion srvProtocolVer; /** * Constructor. @@ -210,13 +210,13 @@ public class JdbcThinTcpIo { BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(HANDSHAKE_MSG_SIZE), null, null); - writer.writeByte((byte)SqlListenerRequest.HANDSHAKE); + writer.writeByte((byte) ClientListenerRequest.HANDSHAKE); writer.writeShort(CURRENT_VER.major()); writer.writeShort(CURRENT_VER.minor()); writer.writeShort(CURRENT_VER.maintenance()); - writer.writeByte(SqlListenerNioListener.JDBC_CLIENT); + writer.writeByte(ClientListenerNioListener.JDBC_CLIENT); writer.writeBoolean(distributedJoins); writer.writeBoolean(enforceJoinOrder); @@ -257,7 +257,7 @@ public class JdbcThinTcpIo { String err = reader.readString(); - srvProtocolVer = SqlListenerProtocolVersion.create(maj, min, maintenance); + srvProtocolVer = ClientListenerProtocolVersion.create(maj, min, maintenance); if (VER_2_1_0.equals(srvProtocolVer)) handshake_2_1_0(); @@ -278,13 +278,13 @@ public class JdbcThinTcpIo { BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(HANDSHAKE_MSG_SIZE), null, null); - writer.writeByte((byte)SqlListenerRequest.HANDSHAKE); + writer.writeByte((byte) ClientListenerRequest.HANDSHAKE); writer.writeShort(VER_2_1_0.major()); writer.writeShort(VER_2_1_0.minor()); writer.writeShort(VER_2_1_0.maintenance()); - writer.writeByte(SqlListenerNioListener.JDBC_CLIENT); + writer.writeByte(ClientListenerNioListener.JDBC_CLIENT); writer.writeBoolean(distributedJoins); writer.writeBoolean(enforceJoinOrder); @@ -308,7 +308,7 @@ public class JdbcThinTcpIo { String err = reader.readString(); - SqlListenerProtocolVersion ver = SqlListenerProtocolVersion.create(maj, min, maintenance); + ClientListenerProtocolVersion ver = ClientListenerProtocolVersion.create(maj, min, maintenance); throw new IgniteCheckedException("Handshake failed [driverProtocolVer=" + CURRENT_VER + ", remoteNodeProtocolVer=" + ver + ", err=" + err + ']'); @@ -354,7 +354,7 @@ public class JdbcThinTcpIo { res.readBinary(reader); - if (res.status() != SqlListenerResponse.STATUS_SUCCESS) + if (res.status() != ClientListenerResponse.STATUS_SUCCESS) throw new IgniteCheckedException("Error server response: [req=" + req + ", resp=" + res + ']'); return (R)res.response(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java index bb6eb14..52b3abc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/jdbc/thin/JdbcThinUtils.java @@ -17,7 +17,7 @@ package org.apache.ignite.internal.jdbc.thin; -import org.apache.ignite.configuration.SqlConnectorConfiguration; +import org.apache.ignite.configuration.ClientConnectorConfiguration; import java.sql.Time; import java.sql.Timestamp; @@ -109,7 +109,7 @@ public class JdbcThinUtils { public static final String PROP_AUTO_CLOSE_SERVER_CURSORS = PROP_PREFIX + PARAM_AUTO_CLOSE_SERVER_CURSOR; /** Default port. */ - public static final int DFLT_PORT = SqlConnectorConfiguration.DFLT_PORT; + public static final int DFLT_PORT = ClientConnectorConfiguration.DFLT_PORT; /** * Trim prefix from property. http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index c2afdef..07be8b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -61,7 +61,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsProcessorAdapter; import org.apache.ignite.internal.processors.job.GridJobProcessor; import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor; import org.apache.ignite.internal.processors.marshaller.GridMarshallerMappingProcessor; -import org.apache.ignite.internal.processors.odbc.SqlListenerProcessor; +import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor; import org.apache.ignite.internal.processors.platform.PlatformProcessor; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.internal.processors.pool.PoolProcessor; @@ -371,7 +371,7 @@ public class StandaloneGridKernalContext implements GridKernalContext { } /** {@inheritDoc} */ - @Override public SqlListenerProcessor sqlListener() { + @Override public ClientListenerProcessor sqlListener() { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java new file mode 100644 index 0000000..eb7bfe8 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerBufferedParser.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.util.nio.GridNioParser; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * This class implements stream parser based on {@link ClientListenerNioServerBuffer}. + * <p> + * The rule for this parser is that every message sent over the stream is prepended with + * 4-byte integer header containing message size. So, the stream structure is as follows: + * <pre> + * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+ + * | MSG_SIZE | MESSAGE | MSG_SIZE | MESSAGE | + * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+ + * </pre> + */ +public class ClientListenerBufferedParser implements GridNioParser { + /** Buffer metadata key. */ + private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + + /** {@inheritDoc} */ + @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { + ClientListenerNioServerBuffer nioBuf = ses.meta(BUF_META_KEY); + + // Decode for a given session is called per one thread, so there should not be any concurrency issues. + // However, we make some additional checks. + if (nioBuf == null) { + nioBuf = new ClientListenerNioServerBuffer(); + + ClientListenerNioServerBuffer old = ses.addMeta(BUF_META_KEY, nioBuf); + + assert old == null; + } + + return nioBuf.read(buf); + } + + /** {@inheritDoc} */ + @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { + byte[] msg0 = (byte[])msg; + + ByteBuffer res = ByteBuffer.allocate(msg0.length + 4); + + res.order(ByteOrder.LITTLE_ENDIAN); + + res.putInt(msg0.length); + res.put(msg0); + + res.flip(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return ClientListenerBufferedParser.class.getSimpleName(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerConnectionContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerConnectionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerConnectionContext.java new file mode 100644 index 0000000..3605f03 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerConnectionContext.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import org.apache.ignite.internal.binary.BinaryReaderExImpl; + +/** + * SQL listener connection context. + */ +public interface ClientListenerConnectionContext { + /** + * @param ver Version to check. + * @return {@code true} if version is supported. + */ + boolean isVersionSupported(ClientListenerProtocolVersion ver); + + /** + * @return Current context version. + */ + ClientListenerProtocolVersion currentVersion(); + + /** + * Initialize from handshake message. + * + * @param ver Protocol version. + * @param reader Reader set to the configuration part of the handshake message. + */ + void initializeFromHandshake(ClientListenerProtocolVersion ver, BinaryReaderExImpl reader); + + /** + * Handler getter. + * @return Request handler for the connection. + */ + ClientListenerRequestHandler handler(); + + /** + * Parser getter + * @return Message parser for the connection. + */ + ClientListenerMessageParser parser(); + + /** + * Called whenever client is disconnected due to correct connection close + * or due to {@code IOException} during network operations. + */ + void onDisconnected(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java new file mode 100644 index 0000000..ab80f47 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerMessageParser.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +/** + * Client listener message parser. + */ +public interface ClientListenerMessageParser { + /** + * Decode request from byte array. + * + * @param msg Message. + * @return Request. + */ + public ClientListenerRequest decode(byte[] msg); + + /** + * Encode response to byte array. + * + * @param resp Response. + * @return Message. + */ + public byte[] encode(ClientListenerResponse resp); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java new file mode 100644 index 0000000..e8ba18d --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioListener.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.binary.BinaryReaderExImpl; +import org.apache.ignite.internal.binary.BinaryWriterExImpl; +import org.apache.ignite.internal.binary.streams.BinaryHeapInputStream; +import org.apache.ignite.internal.binary.streams.BinaryHeapOutputStream; +import org.apache.ignite.internal.binary.streams.BinaryInputStream; +import org.apache.ignite.internal.processors.odbc.jdbc.JdbcConnectionContext; +import org.apache.ignite.internal.processors.odbc.odbc.OdbcConnectionContext; +import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext; +import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; +import org.jetbrains.annotations.Nullable; + +/** + * Client message listener. + */ +public class ClientListenerNioListener extends GridNioServerListenerAdapter<byte[]> { + /** ODBC driver handshake code. */ + public static final byte ODBC_CLIENT = 0; + + /** JDBC driver handshake code. */ + public static final byte JDBC_CLIENT = 1; + + /** Thin client handshake code. */ + public static final byte THIN_CLIENT = 2; + + /** Connection-related metadata key. */ + private static final int CONN_CTX_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); + + /** Busy lock. */ + private final GridSpinBusyLock busyLock; + + /** Kernal context. */ + private final GridKernalContext ctx; + + /** Maximum allowed cursors. */ + private final int maxCursors; + + /** Logger. */ + private final IgniteLogger log; + + /** + * Constructor. + * + * @param ctx Context. + * @param busyLock Shutdown busy lock. + * @param maxCursors Maximum allowed cursors. + */ + public ClientListenerNioListener(GridKernalContext ctx, GridSpinBusyLock busyLock, int maxCursors) { + this.ctx = ctx; + this.busyLock = busyLock; + this.maxCursors = maxCursors; + + log = ctx.log(getClass()); + } + + /** {@inheritDoc} */ + @Override public void onConnected(GridNioSession ses) { + if (log.isDebugEnabled()) + log.debug("Client connected: " + ses.remoteAddress()); + } + + /** {@inheritDoc} */ + @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) { + ClientListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY); + + if (connCtx != null) + connCtx.onDisconnected(); + + if (log.isDebugEnabled()) { + if (e == null) + log.debug("Client disconnected: " + ses.remoteAddress()); + else + log.debug("Client disconnected due to an error [addr=" + ses.remoteAddress() + ", err=" + e + ']'); + } + } + + /** {@inheritDoc} */ + @Override public void onMessage(GridNioSession ses, byte[] msg) { + assert msg != null; + + ClientListenerConnectionContext connCtx = ses.meta(CONN_CTX_META_KEY); + + if (connCtx == null) { + onHandshake(ses, msg); + + return; + } + + ClientListenerMessageParser parser = connCtx.parser(); + ClientListenerRequestHandler handler = connCtx.handler(); + + ClientListenerRequest req; + + try { + req = parser.decode(msg); + } + catch (Exception e) { + log.error("Failed to parse client request.", e); + + ses.close(); + + return; + } + + assert req != null; + + try { + long startTime = 0; + + if (log.isDebugEnabled()) { + startTime = System.nanoTime(); + + log.debug("Client request received [reqId=" + req.requestId() + ", addr=" + + ses.remoteAddress() + ", req=" + req + ']'); + } + + ClientListenerResponse resp = handler.handle(req); + + if (log.isDebugEnabled()) { + long dur = (System.nanoTime() - startTime) / 1000; + + log.debug("Client request processed [reqId=" + req.requestId() + ", dur(mcs)=" + dur + + ", resp=" + resp.status() + ']'); + } + + byte[] outMsg = parser.encode(resp); + + ses.send(outMsg); + } + catch (Exception e) { + log.error("Failed to process client request [req=" + req + ']', e); + + ses.send(parser.encode(handler.handleException(e))); + } + } + + /** + * Perform handshake. + * + * @param ses Session. + * @param msg Message bytes. + */ + private void onHandshake(GridNioSession ses, byte[] msg) { + BinaryInputStream stream = new BinaryHeapInputStream(msg); + + BinaryReaderExImpl reader = new BinaryReaderExImpl(null, stream, null, true); + + byte cmd = reader.readByte(); + + if (cmd != ClientListenerRequest.HANDSHAKE) { + log.error("Unexpected client request (will close session): " + ses.remoteAddress()); + + ses.close(); + + return; + } + + short verMajor = reader.readShort(); + short verMinor = reader.readShort(); + short verMaintenance = reader.readShort(); + + ClientListenerProtocolVersion ver = ClientListenerProtocolVersion.create(verMajor, verMinor, verMaintenance); + + byte clientType = reader.readByte(); + + ClientListenerConnectionContext connCtx = prepareContext(clientType); + + String errMsg = null; + + if (connCtx.isVersionSupported(ver)) { + connCtx.initializeFromHandshake(ver, reader); + + ses.addMeta(CONN_CTX_META_KEY, connCtx); + } + else { + log.warning("Unsupported version: " + ver.toString()); + + errMsg = "Unsupported version."; + } + + // Send response. + BinaryWriterExImpl writer = new BinaryWriterExImpl(null, new BinaryHeapOutputStream(8), null, null); + + if (errMsg == null) + connCtx.handler().writeHandshake(writer); + else { + ClientListenerProtocolVersion currentVer = connCtx.currentVersion(); + + // Failed handshake response + writer.writeBoolean(false); + writer.writeShort(currentVer.major()); + writer.writeShort(currentVer.minor()); + writer.writeShort(currentVer.maintenance()); + writer.doWriteString(errMsg); + } + + ses.send(writer.array()); + } + + /** + * Prepare context. + * + * @param clientType Client type. + * @return Context. + */ + private ClientListenerConnectionContext prepareContext(byte clientType) { + switch (clientType) { + case ODBC_CLIENT: + return new OdbcConnectionContext(ctx, busyLock, maxCursors); + + case JDBC_CLIENT: + return new JdbcConnectionContext(ctx, busyLock, maxCursors); + + case THIN_CLIENT: + return new ClientConnectionContext(ctx); + + default: + throw new IgniteException("Unknown client type: " + clientType); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioServerBuffer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioServerBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioServerBuffer.java new file mode 100644 index 0000000..30ee69f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerNioServerBuffer.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import org.apache.ignite.IgniteCheckedException; +import org.jetbrains.annotations.Nullable; + +import java.nio.ByteBuffer; + +/** + * Client NIO server buffer. + */ +public class ClientListenerNioServerBuffer { + /** Current message data. */ + private byte[] data; + + /** Count of received bytes of the current message. */ + private int cnt = -4; + + /** Current message size. */ + private int msgSize; + + /** + * Reset buffer state. + */ + public void reset() { + msgSize = 0; + cnt = -4; + data = null; + } + + /** + * Checks whether the byte array is filled. + * + * @return Flag indicating whether byte array is filled or not. + */ + public boolean isFilled() { + return cnt > 0 && cnt == msgSize; + } + + /** + * Get data withing the buffer. + * + * @return Data. + */ + public byte[] data() { + return data; + } + + /** + * @param buf Buffer. + * @return Message bytes or {@code null} if message is not fully read yet. + * @throws IgniteCheckedException If failed to parse message. + */ + @Nullable public byte[] read(ByteBuffer buf) throws IgniteCheckedException { + if (cnt < 0) { + for (; cnt < 0 && buf.hasRemaining(); cnt++) + msgSize |= (buf.get() & 0xFF) << (8*(4 + cnt)); + + if (cnt < 0) + return null; + + // If count is 0 then message size should be inited. + if (msgSize <= 0) + throw new IgniteCheckedException("Invalid message size: " + msgSize); + + data = new byte[msgSize]; + } + + assert msgSize > 0; + assert cnt >= 0; + + int remaining = buf.remaining(); + + // If there are more bytes in buffer. + if (remaining > 0) { + int missing = msgSize - cnt; + + // Read only up to message size. + if (missing > 0) { + int len = missing < remaining ? missing : remaining; + + buf.get(data, cnt, len); + + cnt += len; + } + } + + if (cnt == msgSize) { + byte[] data0 = data; + + reset(); + + return data0; + } + else + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java new file mode 100644 index 0000000..3462da2 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProcessor.java @@ -0,0 +1,308 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import java.net.InetAddress; +import java.nio.ByteOrder; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.ClientConnectorConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.OdbcConfiguration; +import org.apache.ignite.configuration.SqlConnectorConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.util.GridSpinBusyLock; +import org.apache.ignite.internal.util.HostAndPortRange; +import org.apache.ignite.internal.util.nio.GridNioAsyncNotifyFilter; +import org.apache.ignite.internal.util.nio.GridNioCodecFilter; +import org.apache.ignite.internal.util.nio.GridNioFilter; +import org.apache.ignite.internal.util.nio.GridNioServer; +import org.apache.ignite.internal.util.nio.GridNioSession; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.IgnitePortProtocol; +import org.apache.ignite.thread.IgniteThreadPoolExecutor; +import org.jetbrains.annotations.Nullable; + +/** + * Client connector processor. + */ +public class ClientListenerProcessor extends GridProcessorAdapter { + /** Default number of selectors. */ + private static final int DFLT_SELECTOR_CNT = Math.min(4, Runtime.getRuntime().availableProcessors()); + + /** Default TCP direct buffer flag. */ + private static final boolean DFLT_TCP_DIRECT_BUF = false; + + /** Busy lock. */ + private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + + /** TCP Server. */ + private GridNioServer<byte[]> srv; + + /** Executor service. */ + private ExecutorService execSvc; + + /** + * @param ctx Kernal context. + */ + public ClientListenerProcessor(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteCheckedException { + IgniteConfiguration cfg = ctx.config(); + + ClientConnectorConfiguration cliConnCfg = prepareConfiguration(cfg); + + if (cliConnCfg != null) { + try { + validateConfiguration(cliConnCfg); + + // Resolve host. + String host = cliConnCfg.getHost(); + + if (host == null) + host = cfg.getLocalHost(); + + InetAddress hostAddr; + + try { + hostAddr = U.resolveLocalHost(host); + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to resolve client connector host: " + host, e); + } + + execSvc = new IgniteThreadPoolExecutor( + "client-connector", + cfg.getIgniteInstanceName(), + cliConnCfg.getThreadPoolSize(), + cliConnCfg.getThreadPoolSize(), + 0, + new LinkedBlockingQueue<Runnable>()); + + Exception lastErr = null; + + int portTo = cliConnCfg.getPort() + cliConnCfg.getPortRange(); + + if (portTo <= 0) // Handle int overflow. + portTo = Integer.MAX_VALUE; + + for (int port = cliConnCfg.getPort(); port <= portTo && port <= 65535; port++) { + try { + GridNioFilter[] filters = new GridNioFilter[] { + new GridNioAsyncNotifyFilter(ctx.igniteInstanceName(), execSvc, log) { + @Override public void onSessionOpened(GridNioSession ses) + throws IgniteCheckedException { + proceedSessionOpened(ses); + } + }, + new GridNioCodecFilter(new ClientListenerBufferedParser(), log, false) + }; + + int maxOpenCursors = cliConnCfg.getMaxOpenCursorsPerConnection(); + + GridNioServer<byte[]> srv0 = GridNioServer.<byte[]>builder() + .address(hostAddr) + .port(port) + .listener(new ClientListenerNioListener(ctx, busyLock, maxOpenCursors)) + .logger(log) + .selectorCount(DFLT_SELECTOR_CNT) + .igniteInstanceName(ctx.igniteInstanceName()) + .serverName("client-listener") + .tcpNoDelay(cliConnCfg.isTcpNoDelay()) + .directBuffer(DFLT_TCP_DIRECT_BUF) + .byteOrder(ByteOrder.nativeOrder()) + .socketSendBufferSize(cliConnCfg.getSocketSendBufferSize()) + .socketReceiveBufferSize(cliConnCfg.getSocketReceiveBufferSize()) + .filters(filters) + .directMode(false) + .idleTimeout(Long.MAX_VALUE) + .build(); + + srv0.start(); + + srv = srv0; + + ctx.ports().registerPort(port, IgnitePortProtocol.TCP, getClass()); + + if (log.isInfoEnabled()) + log.info("Client connector processor has started on TCP port " + port); + + lastErr = null; + + break; + } + catch (Exception e) { + lastErr = e; + } + } + + assert (srv != null && lastErr == null) || (srv == null && lastErr != null); + + if (lastErr != null) + throw new IgniteCheckedException("Failed to bind to any [host:port] from the range [" + + "host=" + host + ", portFrom=" + cliConnCfg.getPort() + ", portTo=" + portTo + + ", lastErr=" + lastErr + ']'); + } + catch (Exception e) { + throw new IgniteCheckedException("Failed to start client connector processor.", e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onKernalStop(boolean cancel) { + if (srv != null) { + busyLock.block(); + + srv.stop(); + + ctx.ports().deregisterPorts(getClass()); + + if (execSvc != null) { + U.shutdownNow(getClass(), execSvc, log); + + execSvc = null; + } + + if (log.isDebugEnabled()) + log.debug("Client connector processor stopped."); + } + } + + /** + * Prepare connector configuration. + * + * @param cfg Ignote configuration. + * @return Connector configuration. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("deprecation") + @Nullable private ClientConnectorConfiguration prepareConfiguration(IgniteConfiguration cfg) + throws IgniteCheckedException { + OdbcConfiguration odbcCfg = cfg.getOdbcConfiguration(); + SqlConnectorConfiguration sqlConnCfg = cfg.getSqlConnectorConfiguration(); + ClientConnectorConfiguration cliConnCfg = cfg.getClientConnectorConfiguration(); + + if (cliConnCfg != null) { + // User set configuration explicitly. User it, but print a warning about ignored SQL/ODBC configs. + if (odbcCfg != null) { + U.warn(log, "Deprecated " + OdbcConfiguration.class.getSimpleName() + " will be ignored because " + + ClientConnectorConfiguration.class.getSimpleName() + " is set."); + } + + if (sqlConnCfg != null) { + U.warn(log, "Deprecated " + SqlConnectorConfiguration.class.getSimpleName() + " will be ignored " + + "because " + ClientConnectorConfiguration.class.getSimpleName() + " is set."); + } + } + else { + cliConnCfg = new ClientConnectorConfiguration(); + + if (sqlConnCfg != null) { + // Migrate from SQL configuration. + cliConnCfg = new ClientConnectorConfiguration(); + + cliConnCfg.setHost(sqlConnCfg.getHost()); + cliConnCfg.setMaxOpenCursorsPerConnection(sqlConnCfg.getMaxOpenCursorsPerConnection()); + cliConnCfg.setPort(sqlConnCfg.getPort()); + cliConnCfg.setPortRange(sqlConnCfg.getPortRange()); + cliConnCfg.setSocketSendBufferSize(sqlConnCfg.getSocketSendBufferSize()); + cliConnCfg.setSocketReceiveBufferSize(sqlConnCfg.getSocketReceiveBufferSize()); + cliConnCfg.setTcpNoDelay(sqlConnCfg.isTcpNoDelay()); + cliConnCfg.setThreadPoolSize(sqlConnCfg.getThreadPoolSize()); + + U.warn(log, "Automatically converted deprecated " + SqlConnectorConfiguration.class.getSimpleName() + + " to " + ClientConnectorConfiguration.class.getSimpleName() + "."); + + if (odbcCfg != null) { + U.warn(log, "Deprecated " + OdbcConfiguration.class.getSimpleName() + " will be ignored because " + + SqlConnectorConfiguration.class.getSimpleName() + " is set."); + } + } + else if (odbcCfg != null) { + // Migrate from ODBC configuration. + HostAndPortRange hostAndPort = parseOdbcEndpoint(odbcCfg); + + cliConnCfg = new ClientConnectorConfiguration(); + + cliConnCfg.setHost(hostAndPort.host()); + cliConnCfg.setPort(hostAndPort.portFrom()); + cliConnCfg.setPortRange(hostAndPort.portTo() - hostAndPort.portFrom()); + cliConnCfg.setThreadPoolSize(odbcCfg.getThreadPoolSize()); + cliConnCfg.setSocketSendBufferSize(odbcCfg.getSocketSendBufferSize()); + cliConnCfg.setSocketReceiveBufferSize(odbcCfg.getSocketReceiveBufferSize()); + cliConnCfg.setMaxOpenCursorsPerConnection(odbcCfg.getMaxOpenCursors()); + + U.warn(log, "Automatically converted deprecated " + OdbcConfiguration.class.getSimpleName() + + " to " + ClientConnectorConfiguration.class.getSimpleName() + "."); + } + } + + return cliConnCfg; + } + + /** + * Validate client connector configuration. + * + * @param cfg Configuration. + * @throws IgniteCheckedException If failed. + */ + private void validateConfiguration(ClientConnectorConfiguration cfg) throws IgniteCheckedException { + assertParameter(cfg.getPort() > 1024, "port > 1024"); + assertParameter(cfg.getPort() <= 65535, "port <= 65535"); + assertParameter(cfg.getPortRange() >= 0, "portRange > 0"); + assertParameter(cfg.getSocketSendBufferSize() >= 0, "socketSendBufferSize > 0"); + assertParameter(cfg.getSocketReceiveBufferSize() >= 0, "socketReceiveBufferSize > 0"); + assertParameter(cfg.getMaxOpenCursorsPerConnection() >= 0, "maxOpenCursorsPerConnection() >= 0"); + assertParameter(cfg.getThreadPoolSize() > 0, "threadPoolSize > 0"); + } + + /** + * Parse ODBC endpoint. + * + * @param odbcCfg ODBC configuration. + * @return ODBC host and port range. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("deprecation") + private HostAndPortRange parseOdbcEndpoint(OdbcConfiguration odbcCfg) throws IgniteCheckedException { + HostAndPortRange res; + + if (F.isEmpty(odbcCfg.getEndpointAddress())) { + res = new HostAndPortRange(OdbcConfiguration.DFLT_TCP_HOST, + OdbcConfiguration.DFLT_TCP_PORT_FROM, + OdbcConfiguration.DFLT_TCP_PORT_TO + ); + } + else { + res = HostAndPortRange.parse(odbcCfg.getEndpointAddress(), + OdbcConfiguration.DFLT_TCP_PORT_FROM, + OdbcConfiguration.DFLT_TCP_PORT_TO, + "Failed to parse ODBC endpoint address" + ); + } + + return res; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProtocolVersion.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProtocolVersion.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProtocolVersion.java new file mode 100644 index 0000000..72936db --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerProtocolVersion.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.NotNull; + +/** + * Client listener protocol version. + */ +public class ClientListenerProtocolVersion implements Comparable<ClientListenerProtocolVersion> { + /** Major part. */ + private final short major; + + /** Minor part. */ + private final short minor; + + /** Maintenance part. */ + private final short maintenance; + + /** + * Create version. + * + * @param major Major part. + * @param minor Minor part. + * @param maintenance Maintenance part. + * @return Version. + */ + public static ClientListenerProtocolVersion create(int major, int minor, int maintenance) { + return new ClientListenerProtocolVersion((short)major, (short)minor, (short)maintenance); + } + + /** + * Constructor. + * + * @param major Major part. + * @param minor Minor part. + * @param maintenance Maintenance part. + */ + private ClientListenerProtocolVersion(short major, short minor, short maintenance) { + this.major = major; + this.minor = minor; + this.maintenance = maintenance; + } + + /** + * @return Major part. + */ + public short major() { + return major; + } + + /** + * @return Minor part. + */ + public short minor() { + return minor; + } + + /** + * @return Maintenance part. + */ + public short maintenance() { + return maintenance; + } + + /** {@inheritDoc} */ + @Override public int compareTo(@NotNull ClientListenerProtocolVersion other) { + int res = major - other.major; + + if (res == 0) + res = minor - other.minor; + + if (res == 0) + res = maintenance - other.maintenance; + + return res; + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return 31 * (31 * major + minor) + maintenance; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (obj != null && obj instanceof ClientListenerProtocolVersion) { + ClientListenerProtocolVersion other = (ClientListenerProtocolVersion)obj; + + return F.eq(major, other.major) && F.eq(minor, other.minor) && F.eq(maintenance, other.maintenance); + } + + return false; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClientListenerProtocolVersion.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequest.java new file mode 100644 index 0000000..9f7d075 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequest.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +/** + * Client listener command request. + */ +public interface ClientListenerRequest { + /** Handshake request. */ + public static final int HANDSHAKE = 1; + + /** + * @return Request ID. + */ + public long requestId(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequestHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequestHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequestHandler.java new file mode 100644 index 0000000..e28d0d9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequestHandler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import org.apache.ignite.internal.binary.BinaryWriterExImpl; + +/** + * Client listener request handler. + */ +public interface ClientListenerRequestHandler { + /** + * Handle request. + * + * @param req Request. + * @return Response. + */ + public ClientListenerResponse handle(ClientListenerRequest req); + + /** + * Handle exception. + * + * @param e Exception. + * @return Error response. + */ + public ClientListenerResponse handleException(Exception e); + + /** + * Write successful handshake response. + * + * @param writer Binary writer. + */ + public void writeHandshake(BinaryWriterExImpl writer); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequestNoId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequestNoId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequestNoId.java new file mode 100644 index 0000000..40a4485 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerRequestNoId.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +/** + * Client request with no ID. + */ +public abstract class ClientListenerRequestNoId implements ClientListenerRequest { + /** {@inheritDoc} */ + @Override public long requestId() { + return 0; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java new file mode 100644 index 0000000..342062e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/ClientListenerResponse.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.odbc; + +import org.jetbrains.annotations.Nullable; + +/** + * Client listener response. + */ +public abstract class ClientListenerResponse { + /** Command succeeded. */ + public static final int STATUS_SUCCESS = 0; + + /** Command failed. */ + public static final int STATUS_FAILED = 1; + + /** Success status. */ + private int status; + + /** Error. */ + private String err; + + /** + * Constructs failed rest response. + * + * @param status Response status. + * @param err Error, {@code null} if success is {@code true}. + */ + public ClientListenerResponse(int status, @Nullable String err) { + this.status = status; + this.err = err; + } + + /** + * @return Success flag. + */ + public int status() { + return status; + } + + /** + * @param status Status. + */ + public void status(int status) { + this.status = status; + } + + /** + * @return Error. + */ + public String error() { + return err; + } + + /** + * @param err Error message. + */ + public void error(String err) { + this.err = err; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9fd82212/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerBufferedParser.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerBufferedParser.java deleted file mode 100644 index a8e124f..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/odbc/SqlListenerBufferedParser.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.odbc; - -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.internal.util.nio.GridNioParser; -import org.apache.ignite.internal.util.nio.GridNioSession; -import org.apache.ignite.internal.util.nio.GridNioSessionMetaKey; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; - -/** - * This class implements stream parser based on {@link SqlListenerNioServerBuffer}. - * <p> - * The rule for this parser is that every message sent over the stream is prepended with - * 4-byte integer header containing message size. So, the stream structure is as follows: - * <pre> - * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+ - * | MSG_SIZE | MESSAGE | MSG_SIZE | MESSAGE | - * +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+ - * </pre> - */ -public class SqlListenerBufferedParser implements GridNioParser { - /** Buffer metadata key. */ - private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey(); - - /** {@inheritDoc} */ - @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException { - SqlListenerNioServerBuffer nioBuf = ses.meta(BUF_META_KEY); - - // Decode for a given session is called per one thread, so there should not be any concurrency issues. - // However, we make some additional checks. - if (nioBuf == null) { - nioBuf = new SqlListenerNioServerBuffer(); - - SqlListenerNioServerBuffer old = ses.addMeta(BUF_META_KEY, nioBuf); - - assert old == null; - } - - return nioBuf.read(buf); - } - - /** {@inheritDoc} */ - @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException { - byte[] msg0 = (byte[])msg; - - ByteBuffer res = ByteBuffer.allocate(msg0.length + 4); - - res.order(ByteOrder.LITTLE_ENDIAN); - - res.putInt(msg0.length); - res.put(msg0); - - res.flip(); - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return SqlListenerBufferedParser.class.getSimpleName(); - } -} \ No newline at end of file