This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 41ef23dbcd IGNITE-20780 Sql. Move session expiration to IgniteSqlImpl
(#2785)
41ef23dbcd is described below
commit 41ef23dbcde254e0fcf2beaaf84e9399559fe203
Author: korlov42 <[email protected]>
AuthorDate: Mon Nov 6 13:36:19 2023 +0200
IGNITE-20780 Sql. Move session expiration to IgniteSqlImpl (#2785)
---
.../main/java/org/apache/ignite/sql/Session.java | 3 +
.../ignite/internal/client/sql/ClientSession.java | 7 +
.../apache/ignite/client/fakes/FakeSession.java | 6 +
.../ignite/internal/thread/NamedThreadFactory.java | 13 ++
.../ignite/internal/sql/api/ItCommonApiTest.java | 37 +---
.../internal/sql/api/ItSqlClientMetricsTest.java | 2 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 7 +-
modules/sql-engine/build.gradle | 1 +
.../internal/sql/api/AsyncResultSetImpl.java | 42 ++--
.../internal/sql/api/IdleExpirationTracker.java | 99 ++++++++++
.../ignite/internal/sql/api/IgniteSqlImpl.java | 79 +++++++-
.../internal/sql/api/SessionBuilderImpl.java | 57 +++++-
.../ignite/internal/sql/api/SessionImpl.java | 69 ++++++-
.../internal/sql/engine/SqlQueryProcessor.java | 24 +--
.../internal/sql/engine/session/Session.java | 67 +------
.../sql/engine/session/SessionManager.java | 97 +--------
.../sql/engine/session/SessionProperty.java | 30 ---
.../ignite/internal/sql/api/IgniteSqlImplTest.java | 119 +++++++++++
.../ignite/internal/sql/api/SessionImplTest.java | 219 +++++++++++++++++++++
.../sql/engine/session/SessionManagerTest.java | 97 +--------
20 files changed, 711 insertions(+), 364 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/sql/Session.java
b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
index 0b10281ae3..a969942acc 100644
--- a/modules/api/src/main/java/org/apache/ignite/sql/Session.java
+++ b/modules/api/src/main/java/org/apache/ignite/sql/Session.java
@@ -311,6 +311,9 @@ public interface Session extends AutoCloseable {
*/
Flow.Publisher<Void> closeReactive();
+ /** Returns {@code true} if the given session has been closed, returns
{@code false} otherwise. */
+ boolean closed();
+
/**
* Creates a new session builder from the current session.
*
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
index f5c1f3e2f2..1fc07cf2de 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSession.java
@@ -289,6 +289,13 @@ public class ClientSession implements AbstractSession {
throw new UnsupportedOperationException("Not implemented yet.");
}
+ /** {@inheritDoc} */
+ @Override
+ public boolean closed() {
+ // TODO IGNITE-17134 Cancel/close all active cursors, queries, futures.
+ return false;
+ }
+
/** {@inheritDoc} */
@Override
public SessionBuilder toBuilder() {
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
index ecc38e921b..3e07f3d14a 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeSession.java
@@ -228,6 +228,12 @@ public class FakeSession implements AbstractSession {
throw new UnsupportedOperationException();
}
+ /** {@inheritDoc} */
+ @Override
+ public boolean closed() {
+ return false;
+ }
+
/** {@inheritDoc} */
@Override
public SessionBuilder toBuilder() {
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
b/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
index 2db1ac0a53..e9c4636ad4 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/thread/NamedThreadFactory.java
@@ -103,4 +103,17 @@ public class NamedThreadFactory implements ThreadFactory {
public static NamedThreadFactory create(String nodeName, String poolName,
IgniteLogger logger) {
return new NamedThreadFactory(threadPrefix(nodeName, poolName),
logger);
}
+
+ /**
+ * Creates a thread factory based on a node's name and a name of the pool.
+ *
+ * @param nodeName Node name.
+ * @param poolName Pool name.
+ * @param daemon Whether threads created by the factory should be daemon
or not.
+ * @param logger Logger.
+ * @return Thread factory.
+ */
+ public static NamedThreadFactory create(String nodeName, String poolName,
boolean daemon, IgniteLogger logger) {
+ return new NamedThreadFactory(threadPrefix(nodeName, poolName),
daemon, logger);
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
index 07dfa90e1b..dbf57ea4e8 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItCommonApiTest.java
@@ -24,18 +24,15 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.time.Instant;
import java.time.LocalDateTime;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import org.apache.calcite.schema.SchemaPlus;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.engine.QueryCancelledException;
-import org.apache.ignite.internal.sql.engine.schema.IgniteTable;
-import org.apache.ignite.internal.sql.engine.schema.SqlSchemaManager;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.ResultSet;
@@ -43,7 +40,6 @@ import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
-import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
/** Test common SQL API. */
@@ -69,12 +65,12 @@ public class ItCommonApiTest extends BaseSqlIntegrationTest
{
ResultSet rs1 = ses1.execute(null, "SELECT id FROM TST");
ResultSet rs2 = ses2.execute(null, "SELECT id FROM TST");
- waitForCondition(() -> queryProcessor().liveSessions().size() == 1,
10_000);
+ assertTrue(waitForCondition(ses1::closed, 10_000));
// first session should no longer exist for the moment
ExecutionException err = assertThrows(ExecutionException.class, () ->
ses1.executeAsync(null, "SELECT 1 + 1").get());
assertThat(err.getCause(), instanceOf(IgniteException.class));
- assertThat(err.getCause().getMessage(), containsString("Session not
found"));
+ assertThat(err.getCause().getMessage(), containsString("Session is
closed"));
// already started query should fail due to session has been expired
assertThrowsWithCause(() -> {
@@ -143,31 +139,4 @@ public class ItCommonApiTest extends
BaseSqlIntegrationTest {
assertEquals(expDateTimeStr,
res.next().datetimeValue(1).toString());
}
}
-
- private static class ErroneousSchemaManager implements SqlSchemaManager {
-
- /** {@inheritDoc} */
- @Override
- public @Nullable SchemaPlus schema(int version) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override
- public @Nullable SchemaPlus schema(long timestamp) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override
- public CompletableFuture<Void> schemaReadyFuture(int version) {
- throw new UnsupportedOperationException();
- }
-
- /** {@inheritDoc} */
- @Override
- public @Nullable IgniteTable table(int schemaVersion, int tableId) {
- return null;
- }
- }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientMetricsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientMetricsTest.java
index 5e3669ea2d..0598550020 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientMetricsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/api/ItSqlClientMetricsTest.java
@@ -105,7 +105,7 @@ public class ItSqlClientMetricsTest extends
BaseSqlIntegrationTest {
assertTrue(waitForCondition(() ->
queryProcessor().liveSessions().isEmpty(), 10_000));
- assertInternalSqlException("Session not found", () ->
session.execute(null, "SELECT * from " + DEFAULT_TABLE_NAME));
+ assertInternalSqlException("Session is closed", () ->
session.execute(null, "SELECT * from " + DEFAULT_TABLE_NAME));
assertMetricValue(clientMetricSet,
SqlClientMetricSource.METRIC_OPEN_CURSORS, 1);
rs1.close();
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 0c0453262c..fce6db22b6 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -210,7 +210,7 @@ public class IgniteImpl implements Ignite {
private final SqlQueryProcessor qryEngine;
/** Sql API facade. */
- private final IgniteSql sql;
+ private final IgniteSqlImpl sql;
/** Configuration manager that handles node (local) configuration. */
private final ConfigurationManager nodeCfgMgr;
@@ -629,7 +629,7 @@ public class IgniteImpl implements Ignite {
systemViewManager
);
- sql = new IgniteSqlImpl(qryEngine, new
IgniteTransactionsImpl(txManager, observableTimestampTracker));
+ sql = new IgniteSqlImpl(name, qryEngine, new
IgniteTransactionsImpl(txManager, observableTimestampTracker));
var deploymentManagerImpl = new DeploymentManagerImpl(
clusterSvc,
@@ -826,7 +826,8 @@ public class IgniteImpl implements Ignite {
indexBuildingManager,
qryEngine,
clientHandlerModule,
- deploymentManager
+ deploymentManager,
+ sql
);
// The system view manager comes last because
other components
diff --git a/modules/sql-engine/build.gradle b/modules/sql-engine/build.gradle
index bd83e64856..b4007747a2 100644
--- a/modules/sql-engine/build.gradle
+++ b/modules/sql-engine/build.gradle
@@ -86,6 +86,7 @@ dependencies {
testImplementation(testFixtures(project(':ignite-configuration')))
testImplementation(testFixtures(project(':ignite-schema')))
testImplementation(testFixtures(project(':ignite-storage-api')))
+ testImplementation(testFixtures(project(':ignite-sql-engine')))
testImplementation(testFixtures(project(':ignite-distribution-zones')))
testImplementation(testFixtures(project(':ignite-placement-driver-api')))
testImplementation(testFixtures(project(':ignite-vault')))
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
index 195bfebf6d..44ecdf1eff 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/AsyncResultSetImpl.java
@@ -44,10 +44,12 @@ import org.jetbrains.annotations.Nullable;
* Asynchronous result set implementation.
*/
public class AsyncResultSetImpl<T> implements AsyncResultSet<T> {
- private static final CompletableFuture<? extends AsyncResultSet>
HAS_NO_MORE_PAGE_FUTURE =
+ private static final CompletableFuture<? extends AsyncResultSet<?>>
HAS_NO_MORE_PAGE_FUTURE =
CompletableFuture.failedFuture(new
SqlException(CURSOR_NO_MORE_PAGES_ERR, "There are no more pages."));
- private final AsyncSqlCursor<List<Object>> cur;
+ private final IdleExpirationTracker expirationTracker;
+
+ private final AsyncSqlCursor<List<Object>> cursor;
private volatile BatchedResult<List<Object>> curPage;
@@ -58,31 +60,43 @@ public class AsyncResultSetImpl<T> implements
AsyncResultSet<T> {
/**
* Constructor.
*
- * @param cur Asynchronous query cursor.
+ * @param cursor Query cursor representing the result of execution.
+ * @param page Current page.
+ * @param pageSize Size of the page to fetch.
+ * @param expirationTracker A tracker to register any interaction with
given result set.
+ * Used to prevent session from expiration.
+ * @param closeRun Callback to be invoked after result is closed.
*/
- public AsyncResultSetImpl(AsyncSqlCursor<List<Object>> cur,
BatchedResult<List<Object>> page, int pageSize, Runnable closeRun) {
- this.cur = cur;
+ AsyncResultSetImpl(
+ AsyncSqlCursor<List<Object>> cursor,
+ BatchedResult<List<Object>> page,
+ int pageSize,
+ IdleExpirationTracker expirationTracker,
+ Runnable closeRun
+ ) {
+ this.cursor = cursor;
this.curPage = page;
this.pageSize = pageSize;
+ this.expirationTracker = expirationTracker;
this.closeRun = closeRun;
}
/** {@inheritDoc} */
@Override
public @Nullable ResultSetMetadata metadata() {
- return hasRowSet() ? cur.metadata() : null;
+ return hasRowSet() ? cursor.metadata() : null;
}
/** {@inheritDoc} */
@Override
public boolean hasRowSet() {
- return cur.queryType() == SqlQueryType.QUERY || cur.queryType() ==
SqlQueryType.EXPLAIN;
+ return cursor.queryType() == SqlQueryType.QUERY || cursor.queryType()
== SqlQueryType.EXPLAIN;
}
/** {@inheritDoc} */
@Override
public long affectedRows() {
- if (cur.queryType() != SqlQueryType.DML) {
+ if (cursor.queryType() != SqlQueryType.DML) {
return -1;
}
@@ -94,7 +108,7 @@ public class AsyncResultSetImpl<T> implements
AsyncResultSet<T> {
/** {@inheritDoc} */
@Override
public boolean wasApplied() {
- if (cur.queryType() != SqlQueryType.DDL) {
+ if (cursor.queryType() != SqlQueryType.DDL) {
return false;
}
@@ -108,8 +122,10 @@ public class AsyncResultSetImpl<T> implements
AsyncResultSet<T> {
public Iterable<T> currentPage() {
requireResultSet();
+ expirationTracker.touch();
+
final Iterator<List<Object>> it0 = curPage.items().iterator();
- final ResultSetMetadata meta0 = cur.metadata();
+ final ResultSetMetadata meta0 = cursor.metadata();
// TODO: IGNITE-18695 map rows to objects when mapper is provided.
return () -> new TransformingIterator<>(it0, (item) -> (T) new
SqlRowImpl(item, meta0));
@@ -128,10 +144,12 @@ public class AsyncResultSetImpl<T> implements
AsyncResultSet<T> {
public CompletableFuture<? extends AsyncResultSet<T>> fetchNextPage() {
requireResultSet();
+ expirationTracker.touch();
+
if (!hasMorePages()) {
return (CompletableFuture<? extends AsyncResultSet<T>>)
HAS_NO_MORE_PAGE_FUTURE;
} else {
- return cur.requestNextAsync(pageSize)
+ return cursor.requestNextAsync(pageSize)
.thenApply(page -> {
curPage = page;
@@ -149,7 +167,7 @@ public class AsyncResultSetImpl<T> implements
AsyncResultSet<T> {
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> closeAsync() {
- return cur.closeAsync().thenRun(closeRun);
+ return cursor.closeAsync().thenRun(closeRun);
}
private void requireResultSet() {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IdleExpirationTracker.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IdleExpirationTracker.java
new file mode 100644
index 0000000000..a209d8dcc7
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IdleExpirationTracker.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.sql.engine.CurrentTimeProvider;
+
+/**
+ * A tracker to check if particular object has not been accessed longer than
specified duration.
+ *
+ * <p>There are two different types of interaction:<ol>
+ * <li>
+ * `touch()` -- updates last access time. Use this method whenever
access to the object
+ * is considered legit, thus should prevent it from expiration.
+ * </li>
+ * <li>
+ * `expired()` -- only checks if object wasn't been accessed longer
than specified timeout,
+ * but doesn't update access time.
+ * </li>
+ * </ol>
+ *
+ */
+class IdleExpirationTracker {
+ /** Marker used to mark a session which has been expired. */
+ private static final long EXPIRED = 0L;
+
+ private final long idleTimeoutMs;
+ private final AtomicLong lastTouched;
+ private final CurrentTimeProvider currentTimeProvider;
+
+ IdleExpirationTracker(
+ long idleTimeoutMs,
+ CurrentTimeProvider currentTimeProvider
+ ) {
+ this.idleTimeoutMs = idleTimeoutMs;
+ this.currentTimeProvider = currentTimeProvider;
+
+ lastTouched = new AtomicLong(currentTimeProvider.now());
+ }
+
+ /**
+ * Checks if the given object has not been accessed longer than specified
duration.
+ *
+ * @return {@code true} if object is expired.
+ */
+ @SuppressWarnings("SimplifiableIfStatement")
+ boolean expired() {
+ long last = lastTouched.get();
+
+ if (last == EXPIRED) {
+ return true;
+ }
+
+ return currentTimeProvider.now() - last > idleTimeoutMs
+ && (lastTouched.compareAndSet(last, EXPIRED) ||
lastTouched.get() == EXPIRED);
+ }
+
+ /**
+ * Updates the timestamp that is used to determine whether the object has
expired or not.
+ *
+ * @return {@code true} if this object has been updated, otherwise returns
{@code false}
+ * meaning the object has expired.
+ */
+ boolean touch() {
+ long last;
+ long now;
+ do {
+ last = lastTouched.get();
+
+ // tracker has been marked as expired
+ if (last == EXPIRED) {
+ return false;
+ }
+
+ now = currentTimeProvider.now();
+
+ if (now - last > idleTimeoutMs && expired()) {
+ return false;
+ }
+ } while (!lastTouched.compareAndSet(last, now));
+
+ return true;
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
index 8f674bfe17..b0bfa14a37 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/IgniteSqlImpl.java
@@ -18,31 +18,67 @@
package org.apache.ignite.internal.sql.api;
import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.sql.engine.session.SessionId;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.Session.SessionBuilder;
import org.apache.ignite.sql.Statement;
import org.apache.ignite.sql.Statement.StatementBuilder;
import org.apache.ignite.tx.IgniteTransactions;
+import org.jetbrains.annotations.TestOnly;
/**
* Embedded implementation of the Ignite SQL query facade.
*/
-public class IgniteSqlImpl implements IgniteSql {
+public class IgniteSqlImpl implements IgniteSql, IgniteComponent {
+ private static final IgniteLogger LOG =
Loggers.forClass(IgniteSqlImpl.class);
+
+ /** Session expiration check period in milliseconds. */
+ private static final long SESSION_EXPIRE_CHECK_PERIOD =
TimeUnit.SECONDS.toMillis(1);
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
private final QueryProcessor qryProc;
private final IgniteTransactions transactions;
+ /** Session expiration worker. */
+ private final ScheduledExecutorService executor;
+
+ private final ConcurrentMap<SessionId, SessionImpl> sessions = new
ConcurrentHashMap<>();
+
/**
* Constructor.
*
* @param qryProc Query processor.
* @param transactions Transactions facade.
*/
- public IgniteSqlImpl(QueryProcessor qryProc, IgniteTransactions
transactions) {
+ public IgniteSqlImpl(
+ String nodeName,
+ QueryProcessor qryProc,
+ IgniteTransactions transactions
+ ) {
this.qryProc = qryProc;
this.transactions = transactions;
+
+ executor = Executors.newSingleThreadScheduledExecutor(
+ NamedThreadFactory.create(nodeName, "sql-session-cleanup",
true, LOG)
+ );
}
/** {@inheritDoc} */
@@ -54,7 +90,9 @@ public class IgniteSqlImpl implements IgniteSql {
/** {@inheritDoc} */
@Override
public SessionBuilder sessionBuilder() {
- return new SessionBuilderImpl(qryProc, transactions, new HashMap<>());
+ return new SessionBuilderImpl(
+ busyLock, sessions, qryProc, transactions,
System::currentTimeMillis, new HashMap<>()
+ );
}
/** {@inheritDoc} */
@@ -68,4 +106,39 @@ public class IgniteSqlImpl implements IgniteSql {
public StatementBuilder statementBuilder() {
return new StatementBuilderImpl();
}
+
+ @Override
+ public void start() {
+ executor.scheduleWithFixedDelay(
+ () -> {
+ for (SessionImpl session : sessions.values()) {
+ if (session.expired()) {
+ session.closeAsync();
+ }
+ }
+ },
+ SESSION_EXPIRE_CHECK_PERIOD,
+ SESSION_EXPIRE_CHECK_PERIOD,
+ TimeUnit.MILLISECONDS
+ );
+ }
+
+ @Override
+ public void stop() throws Exception {
+ if (!closed.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ executor.shutdownNow();
+
+ sessions.values().forEach(SessionImpl::closeAsync);
+ sessions.clear();
+ }
+
+ @TestOnly
+ public List<Session> sessions() {
+ return List.copyOf(sessions.values());
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
index 30eeb59250..f22e224e9d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionBuilderImpl.java
@@ -18,14 +18,18 @@
package org.apache.ignite.internal.sql.api;
import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.sql.AbstractSession;
+import org.apache.ignite.internal.sql.engine.CurrentTimeProvider;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.QueryProperty;
import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
import org.apache.ignite.internal.sql.engine.session.SessionId;
-import org.apache.ignite.internal.sql.engine.session.SessionProperty;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.ErrorGroups.Common;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.Session;
import org.apache.ignite.sql.Session.SessionBuilder;
import org.apache.ignite.tx.IgniteTransactions;
@@ -36,11 +40,17 @@ import org.jetbrains.annotations.Nullable;
*/
public class SessionBuilderImpl implements SessionBuilder {
- public static final long DEFAULT_QUERY_TIMEOUT = 0;
- public static final long DEFAULT_SESSION_TIMEOUT =
TimeUnit.MINUTES.toMillis(5);
+ private static final long DEFAULT_QUERY_TIMEOUT = 0;
+ private static final long DEFAULT_SESSION_TIMEOUT =
TimeUnit.MINUTES.toMillis(5);
private final QueryProcessor qryProc;
+ private final IgniteSpinBusyLock busyLock;
+
+ private final ConcurrentMap<SessionId, SessionImpl> sessions;
+
+ private final CurrentTimeProvider timeProvider;
+
private final Map<String, Object> props;
private IgniteTransactions transactions;
@@ -56,13 +66,27 @@ public class SessionBuilderImpl implements SessionBuilder {
/**
* Session builder constructor.
*
+ * @param busyLock Lock that will be used to synchronise write to {@code
sessions}
+ * map to prevent races on cleaning the map on node stop and adding
newly created session.
+ * @param sessions Active sessions. Any created by this builder session
should be added to this map.
* @param qryProc SQL query processor.
* @param transactions Transactions facade.
+ * @param timeProvider Time provider to check is sessions has expired or
not.
* @param props Initial properties.
*/
- SessionBuilderImpl(QueryProcessor qryProc, IgniteTransactions
transactions, Map<String, Object> props) {
+ SessionBuilderImpl(
+ IgniteSpinBusyLock busyLock,
+ ConcurrentMap<SessionId, SessionImpl> sessions,
+ QueryProcessor qryProc,
+ IgniteTransactions transactions,
+ CurrentTimeProvider timeProvider,
+ Map<String, Object> props
+ ) {
+ this.busyLock = busyLock;
+ this.sessions = sessions;
this.qryProc = qryProc;
this.transactions = transactions;
+ this.timeProvider = timeProvider;
this.props = props;
}
@@ -161,19 +185,38 @@ public class SessionBuilderImpl implements SessionBuilder
{
@Override
public Session build() {
PropertiesHolder propsHolder = PropertiesHelper.newBuilder()
- .set(SessionProperty.IDLE_TIMEOUT, sessionTimeout)
.set(QueryProperty.QUERY_TIMEOUT, queryTimeout)
.set(QueryProperty.DEFAULT_SCHEMA, schema)
.build();
SessionId sessionId = qryProc.createSession(propsHolder);
- return new SessionImpl(
+ SessionImpl session = new SessionImpl(
sessionId,
+ props -> new SessionBuilderImpl(
+ busyLock, sessions, qryProc, transactions,
timeProvider, props
+ ),
qryProc,
transactions,
pageSize,
- propsHolder
+ sessionTimeout,
+ propsHolder,
+ timeProvider,
+ () -> sessions.remove(sessionId)
);
+
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(Common.NODE_STOPPING_ERR, "Node is
stopping.");
+ }
+
+ try {
+ Session old = sessions.put(sessionId, session);
+
+ assert old == null;
+ } finally {
+ busyLock.leaveBusy();
+ }
+
+ return session;
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
index 3c48b78900..e42c85b1ec 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -37,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.sql.AbstractSession;
+import org.apache.ignite.internal.sql.engine.CurrentTimeProvider;
import org.apache.ignite.internal.sql.engine.QueryContext;
import org.apache.ignite.internal.sql.engine.QueryProcessor;
import org.apache.ignite.internal.sql.engine.QueryProperty;
@@ -45,7 +46,6 @@ import
org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
import org.apache.ignite.internal.sql.engine.property.Property;
import org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.sql.engine.session.SessionNotFoundException;
-import org.apache.ignite.internal.sql.engine.session.SessionProperty;
import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -72,6 +72,8 @@ public class SessionImpl implements AbstractSession {
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final SessionBuilderFactory sessionBuilderFactory;
+
private final QueryProcessor qryProc;
private final IgniteTransactions transactions;
@@ -82,26 +84,49 @@ public class SessionImpl implements AbstractSession {
private final PropertiesHolder props;
+ private final long idleTimeoutMs;
+
+ private final IdleExpirationTracker expirationTracker;
+
+ private final Runnable onClose;
+
/**
* Constructor.
*
+ * @param sessionId Identifier of the session.
+ * @param sessionBuilderFactory Map of created sessions.
* @param qryProc Query processor.
* @param transactions Transactions facade.
* @param pageSize Query fetch page size.
+ * @param idleTimeoutMs Duration in milliseconds after which the session
will be considered expired if no action have been
+ * performed on behalf of this session during this
period.
* @param props Session's properties.
+ * @param timeProvider The time provider used to update the timestamp on
every touch of this object.
*/
SessionImpl(
SessionId sessionId,
+ SessionBuilderFactory sessionBuilderFactory,
QueryProcessor qryProc,
IgniteTransactions transactions,
int pageSize,
- PropertiesHolder props
+ long idleTimeoutMs,
+ PropertiesHolder props,
+ CurrentTimeProvider timeProvider,
+ Runnable onClose
) {
this.qryProc = qryProc;
this.transactions = transactions;
this.sessionId = sessionId;
this.pageSize = pageSize;
this.props = props;
+ this.idleTimeoutMs = idleTimeoutMs;
+ this.sessionBuilderFactory = sessionBuilderFactory;
+ this.onClose = onClose;
+
+ expirationTracker = new IdleExpirationTracker(
+ idleTimeoutMs,
+ timeProvider
+ );
}
/** {@inheritDoc} */
@@ -125,7 +150,7 @@ public class SessionImpl implements AbstractSession {
/** {@inheritDoc} */
@Override
public long idleTimeout(TimeUnit timeUnit) {
- return timeUnit.convert(props.get(SessionProperty.IDLE_TIMEOUT),
TimeUnit.MILLISECONDS);
+ return timeUnit.convert(idleTimeoutMs, TimeUnit.MILLISECONDS);
}
/** {@inheritDoc} */
@@ -161,7 +186,7 @@ public class SessionImpl implements AbstractSession {
propertyMap.put(entry.getKey().name, entry.getValue());
}
- return new SessionBuilderImpl(qryProc, transactions, propertyMap)
+ return sessionBuilderFactory.fromProperties(propertyMap)
.defaultPageSize(pageSize);
}
@@ -170,7 +195,10 @@ public class SessionImpl implements AbstractSession {
public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
@Nullable Transaction transaction,
String query,
- @Nullable Object... arguments) {
+ @Nullable Object... arguments
+ ) {
+ touchAndCloseIfExpired();
+
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new
SqlException(SESSION_CLOSED_ERR, "Session is closed."));
}
@@ -187,6 +215,7 @@ public class SessionImpl implements AbstractSession {
cur,
batchRes,
pageSize,
+ expirationTracker,
() -> {}
)
)
@@ -242,6 +271,8 @@ public class SessionImpl implements AbstractSession {
/** {@inheritDoc} */
@Override
public CompletableFuture<long[]> executeBatchAsync(@Nullable Transaction
transaction, String query, BatchedArguments batch) {
+ touchAndCloseIfExpired();
+
if (!busyLock.enterBusy()) {
return CompletableFuture.failedFuture(new
SqlException(SESSION_CLOSED_ERR, "Session is closed."));
}
@@ -377,9 +408,26 @@ public class SessionImpl implements AbstractSession {
throw new UnsupportedOperationException("Not implemented yet.");
}
+ /** {@inheritDoc} */
+ @Override
+ public boolean closed() {
+ return closed.get();
+ }
+
+ boolean expired() {
+ return expirationTracker.expired();
+ }
+
+ SessionId id() {
+ return sessionId;
+ }
+
+ @SuppressWarnings("resource")
private void closeInternal() {
if (closed.compareAndSet(false, true)) {
busyLock.block();
+
+ onClose.run();
}
}
@@ -392,4 +440,15 @@ public class SessionImpl implements AbstractSession {
throw new IgniteInternalException(INTERNAL_ERR, "Invalid DML
results: " + page);
}
}
+
+ private void touchAndCloseIfExpired() {
+ if (!expirationTracker.touch()) {
+ closeAsync();
+ }
+ }
+
+ @FunctionalInterface
+ interface SessionBuilderFactory {
+ SessionBuilder fromProperties(Map<String, Object> properties);
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index d0f43740f5..567cf8e855 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -31,7 +31,6 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
@@ -80,7 +79,6 @@ import
org.apache.ignite.internal.sql.engine.session.SessionId;
import org.apache.ignite.internal.sql.engine.session.SessionInfo;
import org.apache.ignite.internal.sql.engine.session.SessionManager;
import org.apache.ignite.internal.sql.engine.session.SessionNotFoundException;
-import org.apache.ignite.internal.sql.engine.session.SessionProperty;
import org.apache.ignite.internal.sql.engine.sql.ParsedResult;
import org.apache.ignite.internal.sql.engine.sql.ParserService;
import org.apache.ignite.internal.sql.engine.sql.ParserServiceImpl;
@@ -123,21 +121,11 @@ public class SqlQueryProcessor implements QueryProcessor {
/** Number of the schemas in cache. */
private static final int SCHEMA_CACHE_SIZE = 128;
- /** Session expiration check period in milliseconds. */
- private static final long SESSION_EXPIRE_CHECK_PERIOD =
TimeUnit.SECONDS.toMillis(1);
-
- /**
- * Duration in milliseconds after which the session will be considered
expired if no action have been performed
- * on behalf of this session during this period.
- */
- private static final long DEFAULT_SESSION_IDLE_TIMEOUT =
TimeUnit.MINUTES.toMillis(15);
-
/** Name of the default schema. */
private static final String DEFAULT_SCHEMA_NAME = "PUBLIC";
private static final PropertiesHolder DEFAULT_PROPERTIES =
PropertiesHelper.newBuilder()
.set(QueryProperty.DEFAULT_SCHEMA, DEFAULT_SCHEMA_NAME)
- .set(SessionProperty.IDLE_TIMEOUT, DEFAULT_SESSION_IDLE_TIMEOUT)
.build();
private static final CacheFactory CACHE_FACTORY =
CaffeineCacheFactory.INSTANCE;
@@ -169,7 +157,7 @@ public class SqlQueryProcessor implements QueryProcessor {
private final SystemViewManager systemViewManager;
- private volatile SessionManager sessionManager;
+ private final SessionManager sessionManager = new SessionManager();
private volatile QueryTaskExecutor taskExecutor;
@@ -232,8 +220,6 @@ public class SqlQueryProcessor implements QueryProcessor {
public synchronized void start() {
var nodeName = clusterSrvc.topologyService().localMember().name();
- sessionManager = registerService(new SessionManager(nodeName,
SESSION_EXPIRE_CHECK_PERIOD, System::currentTimeMillis));
-
taskExecutor = registerService(new QueryTaskExecutorImpl(nodeName));
var mailboxRegistry = registerService(new MailboxRegistryImpl());
@@ -340,7 +326,6 @@ public class SqlQueryProcessor implements QueryProcessor {
properties = PropertiesHelper.merge(properties, DEFAULT_PROPERTIES);
return sessionManager.createSession(
- properties.get(SessionProperty.IDLE_TIMEOUT),
properties
);
}
@@ -455,7 +440,7 @@ public class SqlQueryProcessor implements QueryProcessor {
.parameters(params)
.build();
- return prepareSvc.prepareAsync(result,
ctx).thenApply(plan -> executePlan(session, txWrapper, ctx, plan));
+ return prepareSvc.prepareAsync(result,
ctx).thenApply(plan -> executePlan(txWrapper, ctx, plan));
}).whenComplete((res, ex) -> {
if (ex != null) {
txWrapper.rollback();
@@ -492,7 +477,6 @@ public class SqlQueryProcessor implements QueryProcessor {
}
private AsyncSqlCursor<List<Object>> executePlan(
- Session session,
QueryTransactionWrapper txWrapper,
BaseQueryContext ctx,
QueryPlan plan
@@ -512,15 +496,11 @@ public class SqlQueryProcessor implements QueryProcessor {
@Override
public CompletableFuture<BatchedResult<List<Object>>>
requestNextAsync(int rows) {
- session.touch();
-
return dataCursor.requestNextAsync(rows);
}
@Override
public CompletableFuture<Void> closeAsync() {
- session.touch();
-
if (finished.compareAndSet(false, true)) {
numberOfOpenCursors.decrementAndGet();
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/Session.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/Session.java
index ede4c7bb9a..33ba8488f1 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/Session.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/Session.java
@@ -23,12 +23,10 @@ import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.internal.sql.engine.AsyncCloseable;
-import org.apache.ignite.internal.sql.engine.CurrentTimeProvider;
import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
/**
@@ -38,9 +36,6 @@ import
org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
* with all properties set during session's creation.
*/
public class Session implements AsyncCloseable {
- /** Marker used to mark a session which has been expired. */
- private static final long EXPIRED = 0L;
-
private final Set<AsyncCloseable> resources = Collections.synchronizedSet(
Collections.newSetFromMap(new IdentityHashMap<>())
);
@@ -49,33 +44,24 @@ public class Session implements AsyncCloseable {
private final AtomicReference<CompletableFuture<Void>> closeFutRef = new
AtomicReference<>();
- private final long idleTimeoutMs;
private final SessionId sessionId;
- private final AtomicLong lastTouched;
private final PropertiesHolder properties;
- private final CurrentTimeProvider currentTimeProvider;
+ private final Runnable onClose;
/**
* Constructor.
*
* @param sessionId A session identifier.
- * @param currentTimeProvider The time provider used to update the
timestamp on every touch of this object.
- * @param idleTimeoutMs Duration in milliseconds after which the session
will be considered expired if no action have been
- * performed on behalf of this session during this
period.
* @param properties The properties to keep within.
*/
public Session(
SessionId sessionId,
- CurrentTimeProvider currentTimeProvider,
- long idleTimeoutMs,
- PropertiesHolder properties
+ PropertiesHolder properties,
+ Runnable onClose
) {
this.sessionId = sessionId;
- this.currentTimeProvider = currentTimeProvider;
- this.idleTimeoutMs = idleTimeoutMs;
this.properties = properties;
-
- lastTouched = new AtomicLong(currentTimeProvider.now());
+ this.onClose = onClose;
}
/** Returns the properties this session associated with. */
@@ -88,43 +74,6 @@ public class Session implements AsyncCloseable {
return sessionId;
}
- /** Returns the duration in millis after which the session will be
considered expired if no one touched it in the middle. */
- public long idleTimeoutMs() {
- return idleTimeoutMs;
- }
-
- /** Checks whether the given session has expired or not. */
- public boolean expired() {
- var last = lastTouched.get();
-
- if (last == EXPIRED) {
- return true;
- }
-
- return currentTimeProvider.now() - last > idleTimeoutMs
- && (lastTouched.compareAndSet(last, EXPIRED) ||
lastTouched.get() == EXPIRED);
- }
-
- /**
- * Updates the timestamp that is used to determine whether the session has
expired or not.
- *
- * <p>Note: don't forget to touch the session every time you going to use
it, otherwise it might expire in the middle of the operation.
- *
- * @return A {@code true} if this session has been updated, otherwise
returns {@code false} means the session has expired.
- */
- public boolean touch() {
- long time;
- do {
- time = lastTouched.get();
-
- if (time == EXPIRED) {
- return false;
- }
- } while (!lastTouched.compareAndSet(time, currentTimeProvider.now()));
-
- return true;
- }
-
/**
* Registers a resource within current session to release in case this
session will be closed.
*
@@ -138,12 +87,6 @@ public class Session implements AsyncCloseable {
throw new IllegalStateException(format("Attempt to register
resource to an expired session [{}]", sessionId));
}
- if (expired()) {
- lock.readLock().unlock();
-
- throw new IllegalStateException(format("Attempt to register
resource to an expired session [{}]", sessionId));
- }
-
try {
resources.add(resource);
} finally {
@@ -174,7 +117,7 @@ public class Session implements AsyncCloseable {
if (closeFutRef.compareAndSet(null, new CompletableFuture<>())) {
lock.writeLock().lock();
- lastTouched.set(EXPIRED);
+ onClose.run();
var futs = new CompletableFuture[resources.size()];
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/SessionManager.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/SessionManager.java
index 055aa03b07..4d2c12e140 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/SessionManager.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/SessionManager.java
@@ -23,70 +23,23 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.sql.engine.CurrentTimeProvider;
-import org.apache.ignite.internal.sql.engine.exec.LifecycleAware;
import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
-import org.apache.ignite.internal.thread.IgniteThread;
-import org.apache.ignite.internal.util.worker.IgniteWorker;
import org.jetbrains.annotations.Nullable;
/**
* A manager of a server side sql sessions.
*/
-public class SessionManager implements LifecycleAware {
- private static final IgniteLogger LOG =
Loggers.forClass(SessionManager.class);
-
+public class SessionManager {
/** Active sessions. */
private final Map<SessionId, Session> activeSessions = new
ConcurrentHashMap<>();
- private final CurrentTimeProvider timeProvider;
-
- /** Session expiration worker. */
- private final IgniteWorker expirationWorker;
-
- private final AtomicBoolean startedFlag = new AtomicBoolean(false);
-
- /**
- * Constructor.
- *
- * @param igniteInstanceName The name of the current node.
- * @param expirationCheckPeriod Time period in milliseconds to check
sessions expiration.
- * @param timeProvider A time provider to use for session management.
- */
- public SessionManager(String igniteInstanceName, long
expirationCheckPeriod, CurrentTimeProvider timeProvider) {
- this.timeProvider = timeProvider;
-
- expirationWorker = new IgniteWorker(LOG, igniteInstanceName,
"session_cleanup-thread", null) {
- @Override
- protected void body() throws InterruptedException {
- while (!isCancelled()) {
- blockingSectionBegin();
- try {
- Thread.sleep(expirationCheckPeriod);
- } finally {
- blockingSectionEnd();
- }
-
-
activeSessions.values().stream().filter(Session::expired).forEach((s) ->
destroySession(s));
-
- LOG.debug("Expired SQL sessions has been cleaned up.
Active sessions [count={}]", activeSessions.size());
- }
- }
- };
- }
-
/**
* Creates a new session.
*
- * @param idleTimeoutMs Duration in milliseconds after which the session
will be considered expired if no action have been performed on
- * behalf of this session during this period.
* @param queryProperties Properties to keep within the session.
* @return A new session.
*/
public SessionId createSession(
- long idleTimeoutMs,
PropertiesHolder queryProperties
) {
var applied = new AtomicBoolean(false);
@@ -99,7 +52,7 @@ public class SessionManager implements LifecycleAware {
activeSessions.computeIfAbsent(sessionId, key -> {
applied.set(true);
- return new Session(key, timeProvider, idleTimeoutMs,
queryProperties);
+ return new Session(key, queryProperties, () ->
activeSessions.remove(key));
});
} while (!applied.get());
@@ -107,20 +60,13 @@ public class SessionManager implements LifecycleAware {
}
/**
- * Returns a session for the given id, or {@code null} if this session
have already expired or never exists.
+ * Returns a session for the given id, or {@code null} if this session
never exists.
*
* @param sessionId An identifier of session of interest.
- * @return A session associated with given id, or {@code null} if this
session have already expired or never exists.
+ * @return A session associated with given id, or {@code null} if this
session never exists.
*/
public @Nullable Session session(SessionId sessionId) {
- var session = activeSessions.get(sessionId);
-
- if (session != null && !session.touch()) {
- destroySession(session);
- session = null;
- }
-
- return session;
+ return activeSessions.get(sessionId);
}
/**
@@ -129,39 +75,10 @@ public class SessionManager implements LifecycleAware {
* @return List of active sessions
*/
public List<SessionInfo> liveSessions() {
- return activeSessions.values().stream().filter((s) ->
!s.expired()).map(SessionInfo::new).collect(Collectors.toList());
+ return
activeSessions.values().stream().map(SessionInfo::new).collect(Collectors.toList());
}
- /**
- * Destroy a given session.
- *
- * @param session Session which should be destroyed
- */
- private void destroySession(Session session) {
- session.closeAsync();
- activeSessions.remove(session.sessionId());
- }
-
- private SessionId nextSessionId() {
+ private static SessionId nextSessionId() {
return new SessionId(UUID.randomUUID());
}
-
- /**
- * Initialize the service by starting session expiration thread.
- */
- @Override
- public void start() {
- if (startedFlag.compareAndSet(false, true)) {
- IgniteThread expirationThread = new IgniteThread(expirationWorker);
-
- expirationThread.setDaemon(true);
- expirationThread.start();
- }
- }
-
- /** Stop the service by stopping session expiration thread. */
- @Override
- public void stop() {
- expirationWorker.cancel();
- }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/SessionProperty.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/SessionProperty.java
deleted file mode 100644
index 5978f9aa3f..0000000000
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/session/SessionProperty.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.sql.engine.session;
-
-import org.apache.ignite.internal.sql.engine.property.Property;
-
-/**
- * Enumerates the properties related to session management domain.
- */
-public final class SessionProperty {
- /**
- * Duration of inactivity in milliseconds after which the session will be
killed.
- */
- public static final Property<Long> IDLE_TIMEOUT = new
Property<>("session_idle_timeout", Long.class);
-}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/IgniteSqlImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/IgniteSqlImplTest.java
new file mode 100644
index 0000000000..dab87062ae
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/IgniteSqlImplTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.sql.engine.session.SessionId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.sql.Session;
+import org.apache.ignite.sql.Session.SessionBuilder;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests to verify {@link IgniteSqlImpl}.
+ */
+@SuppressWarnings({"ThrowableNotThrown", "resource"})
+@ExtendWith(MockitoExtension.class)
+class IgniteSqlImplTest extends BaseIgniteAbstractTest {
+ @Test
+ void allSessionsAreClosedAfterFacadeIsStopped() throws Exception {
+ IgniteSqlImpl facade = newSqlFacade();
+
+ SessionBuilder builder = facade.sessionBuilder();
+
+ List<Session> sessions = new ArrayList<>();
+
+ sessions.add(builder.build());
+ sessions.add(builder.build());
+ sessions.add(facade.sessionBuilder().build());
+ sessions.add(facade.createSession());
+
+ for (Session session : sessions) {
+ assertThat(session.closed(), is(false));
+ }
+
+ facade.stop();
+
+ for (Session session : sessions) {
+ assertThat(session.closed(), is(true));
+ }
+
+ assertThat(facade.sessions(), empty());
+ }
+
+ @Test
+ void itsImpossibleToCreateSessionsAfterFacadeIsStopped() throws Exception {
+ IgniteSqlImpl facade = newSqlFacade();
+
+ SessionBuilder builder = facade.sessionBuilder();
+
+ Session templateSession = builder.build();
+
+ facade.stop();
+
+ IgniteTestUtils.assertThrows(
+ IgniteException.class,
+ builder::build,
+ "Node is stopping"
+ );
+
+ IgniteTestUtils.assertThrows(
+ IgniteException.class,
+ () -> facade.sessionBuilder().build(),
+ "Node is stopping"
+ );
+
+ IgniteTestUtils.assertThrows(
+ IgniteException.class,
+ facade::createSession,
+ "Node is stopping"
+ );
+
+ IgniteTestUtils.assertThrows(
+ IgniteException.class,
+ () -> templateSession.toBuilder().build(),
+ "Node is stopping"
+ );
+
+ assertThat(facade.sessions(), empty());
+ }
+
+ private static IgniteSqlImpl newSqlFacade() {
+ QueryProcessor queryProcessor = mock(QueryProcessor.class);
+
+ when(queryProcessor.createSession(any()))
+ .thenAnswer(ignored -> new SessionId(UUID.randomUUID()));
+
+ return new IgniteSqlImpl("test", queryProcessor,
mock(IgniteTransactions.class));
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/SessionImplTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/SessionImplTest.java
new file mode 100644
index 0000000000..2854b30fd2
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/api/SessionImplTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.api;
+
+import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.apache.ignite.lang.ErrorGroups.Sql.SESSION_CLOSED_ERR;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.internal.sql.engine.AsyncSqlCursor;
+import org.apache.ignite.internal.sql.engine.QueryProcessor;
+import org.apache.ignite.internal.sql.engine.SqlQueryType;
+import org.apache.ignite.internal.sql.engine.session.SessionId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.util.AsyncCursor.BatchedResult;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.sql.BatchedArguments;
+import org.apache.ignite.sql.Session.SessionBuilder;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.tx.IgniteTransactions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+/**
+ * Tests to verify {@link SessionImpl}.
+ */
+@SuppressWarnings({"resource", "ThrowableNotThrown"})
+@ExtendWith(MockitoExtension.class)
+class SessionImplTest extends BaseIgniteAbstractTest {
+ private final ConcurrentMap<SessionId, SessionImpl> sessions = new
ConcurrentHashMap<>();
+
+ private final AtomicLong clock = new AtomicLong();
+
+ @Mock
+ private QueryProcessor queryProcessor;
+
+ @BeforeEach
+ void setUp() {
+ clock.set(1L);
+
+ when(queryProcessor.closeSession(any()))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ }
+
+ @AfterEach
+ void cleanUp() {
+ sessions.values().forEach(SessionImpl::close);
+
+ sessions.clear();
+ }
+
+ @Test
+ void sessionExpiredAfterIdleTimeout() {
+ SessionImpl session = newSession(2);
+ assertThat(session.expired(), is(false));
+
+ //period is small to expire session
+ clock.addAndGet(1);
+ assertThat(session.expired(), is(false));
+
+ //period is enough to session expired
+ clock.addAndGet(2);
+ assertThat(session.expired(), is(true));
+ }
+
+ @Test
+ void sessionCleanUpsItselfFromSessionsMapOnClose() {
+ SessionImpl session = newSession(3);
+ assertThat(sessions.get(session.id()), sameInstance(session));
+
+ session.close();
+
+ assertThat(sessions.get(session.id()), nullValue());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void sessionsShouldNotExpireWhenNewQueryExecuted() {
+ AsyncSqlCursor<List<Object>> result = mock(AsyncSqlCursor.class);
+
+ when(result.requestNextAsync(anyInt()))
+ .thenReturn(CompletableFuture.completedFuture(new
BatchedResult<>(List.of(List.of(0L)), false)));
+
+ when(queryProcessor.querySingleAsync(any(), any(), any(), any(),
any(Object[].class)))
+ .thenReturn(CompletableFuture.completedFuture(result));
+
+ SessionImpl session = newSession(3);
+
+ clock.addAndGet(2);
+
+ await(session.executeAsync(null, "SELECT 1", 1));
+
+ clock.addAndGet(2);
+ assertThat(session.expired(), is(false));
+
+ await(session.executeBatchAsync(null, "SELECT 1",
BatchedArguments.of()));
+
+ clock.addAndGet(2);
+ assertThat(session.expired(), is(false));
+
+ clock.addAndGet(2);
+
+ assertThrowsSqlException(
+ SESSION_CLOSED_ERR,
+ "Session is closed",
+ () -> await(session.executeAsync(null, "SELECT 1"))
+ );
+
+ assertThrowsSqlException(
+ SESSION_CLOSED_ERR,
+ "Session is closed",
+ () -> await(session.executeBatchAsync(null, "SELECT 1",
BatchedArguments.of()))
+ );
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ void sessionsShouldNotExpireWhenResultSetAreProcessed() {
+ AsyncSqlCursor<List<Object>> result = mock(AsyncSqlCursor.class);
+
+ when(result.requestNextAsync(anyInt()))
+ .thenReturn(CompletableFuture.completedFuture(new
BatchedResult<>(List.of(List.of(0L)), false)));
+ when(result.queryType())
+ .thenReturn(SqlQueryType.QUERY);
+
+ when(queryProcessor.querySingleAsync(any(), any(), any(), any(),
any(Object[].class)))
+ .thenReturn(CompletableFuture.completedFuture(result));
+
+ SessionImpl session = newSession(3);
+
+ AsyncResultSet<?> rs = await(session.executeAsync(null, "SELECT 1",
1));
+
+ assertThat(rs, notNullValue());
+
+ clock.addAndGet(2);
+ assertThat(session.expired(), is(false));
+
+ {
+ rs.currentPage();
+
+ clock.addAndGet(2);
+ assertThat(session.expired(), is(false));
+ }
+
+ {
+ rs.currentPage();
+
+ clock.addAndGet(2);
+ assertThat(session.expired(), is(false));
+ }
+
+ {
+ rs.fetchNextPage();
+
+ clock.addAndGet(2);
+ assertThat(session.expired(), is(false));
+ }
+
+ {
+ rs.fetchNextPage();
+
+ clock.addAndGet(2);
+ assertThat(session.expired(), is(false));
+ }
+ }
+
+ private SessionImpl newSession(long idleTimeout) {
+ when(queryProcessor.createSession(any()))
+ .thenAnswer(ignored -> new SessionId(UUID.randomUUID()));
+
+ SessionBuilder builder = new SessionBuilderImpl(
+ new IgniteSpinBusyLock(),
+ sessions,
+ queryProcessor,
+ mock(IgniteTransactions.class),
+ clock::get,
+ new HashMap<>()
+ );
+
+ return (SessionImpl) builder
+ .idleTimeout(idleTimeout, TimeUnit.MILLISECONDS)
+ .build();
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/session/SessionManagerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/session/SessionManagerTest.java
index 3c12319592..fcff46cef4 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/session/SessionManagerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/session/SessionManagerTest.java
@@ -17,21 +17,13 @@
package org.apache.ignite.internal.sql.engine.session;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.sql.engine.property.PropertiesHelper;
import org.apache.ignite.internal.sql.engine.property.PropertiesHolder;
-import org.apache.ignite.internal.testframework.IgniteTestUtils;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -41,108 +33,23 @@ import org.junit.jupiter.api.Test;
class SessionManagerTest {
private SessionManager sessionMgr;
- private AtomicLong clock = new AtomicLong(System.currentTimeMillis());
@BeforeEach
void beforeEach() {
- sessionMgr = new SessionManager("test", 20, () -> clock.get());
- }
-
- @AfterEach
- void afterEach() {
- sessionMgr.stop();
+ sessionMgr = new SessionManager();
}
@Test
void sessionGet() {
PropertiesHolder propertiesHolder = PropertiesHelper.emptyHolder();
- SessionId sessionId = sessionMgr.createSession(12345,
propertiesHolder);
+ SessionId sessionId = sessionMgr.createSession(propertiesHolder);
Session session = sessionMgr.session(sessionId);
assertNotNull(session);
assertSame(propertiesHolder, session.properties());
- assertEquals(12345, session.idleTimeoutMs());
SessionId unknownSessionId = new SessionId(UUID.randomUUID());
assertNull(sessionMgr.session(unknownSessionId));
}
-
- @Test
- void sessionExpiration() throws InterruptedException {
- clock.set(1);
- SessionId sessionId = sessionMgr.createSession(2, null);
-
- Session session = sessionMgr.session(sessionId);
- assertFalse(session.expired());
-
- //period is small to expire session
- clock.set(2);
- assertFalse(session.expired());
-
- //period is enough to session expired, but we touch session and
prolong times live
- clock.set(4);
- assertNotNull(sessionMgr.session(sessionId));
- assertFalse(session.expired());
-
- clock.set(7);
- assertTrue(session.expired());
- assertNull(sessionMgr.session(sessionId));
- // touch session don't change already expire state.
- assertTrue(session.expired());
- }
-
- @Test
- void expirationThreadTests() throws InterruptedException {
- long idleTimeout = 20;
-
- SessionManager sesMgr = new SessionManager("test", 20,
System::currentTimeMillis);
- sesMgr.start();
-
- SessionId sessionId1 = sesMgr.createSession(idleTimeout, null);
- SessionId sessionId2 = sesMgr.createSession(idleTimeout, null);
-
- AtomicBoolean signal1 = new AtomicBoolean(false);
- AtomicBoolean signal2 = new AtomicBoolean(false);
-
- Session session1 = sesMgr.session(sessionId1);
- session1.registerResource(() -> {
- signal1.set(true);
- return CompletableFuture.completedFuture(null);
- }
- );
-
- Session session2 = sesMgr.session(sessionId2);
- session2.registerResource(() -> {
- signal2.set(true);
- return CompletableFuture.completedFuture(null);
- }
- );
-
- // waiting for expiration first session meanwhile touch second session.
- IgniteTestUtils.waitForCondition(
- () -> {
- sesMgr.session(sessionId2);
- return signal1.get();
- },
- 10,
- 50);
-
- // The first session should be expired.
- assertNull(sesMgr.session(sessionId1));
- assertTrue(session1.expired());
- // The second session is alive due to it has been touched.
- assertNotNull(sesMgr.session(sessionId2));
- assertFalse(session2.expired());
-
- IgniteTestUtils.waitForCondition(
- () -> signal2.get(),
- 10,
- 50);
-
- assertNull(sesMgr.session(sessionId2));
- assertTrue(session2.expired());
-
- sesMgr.stop();
- }
}