This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 35f2ff6fc5a Clarify the exception message when using a closed session
(#15811)
35f2ff6fc5a is described below
commit 35f2ff6fc5a69707669cfecfbf262c210638801c
Author: Jiang Tian <[email protected]>
AuthorDate: Wed Jun 25 09:34:20 2025 +0800
Clarify the exception message when using a closed session (#15811)
---
.../org/apache/iotdb/session/it/SessionIT.java | 133 +++++++++--
.../java/org/apache/iotdb/isession/ISession.java | 6 +-
.../java/org/apache/iotdb/session/Session.java | 244 +++++++++++----------
.../apache/iotdb/session/SessionConnection.java | 6 +-
.../subscription/AbstractSubscriptionSession.java | 2 +-
.../subscription/SubscriptionSessionWrapper.java | 4 +-
.../base/AbstractSubscriptionProvider.java | 30 +--
7 files changed, 270 insertions(+), 155 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/session/it/SessionIT.java
b/integration-test/src/test/java/org/apache/iotdb/session/it/SessionIT.java
index 3bc7a5984aa..52638352ee3 100644
--- a/integration-test/src/test/java/org/apache/iotdb/session/it/SessionIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/session/it/SessionIT.java
@@ -21,10 +21,14 @@ package org.apache.iotdb.session.it;
import org.apache.iotdb.isession.ISession;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+import org.apache.iotdb.session.Session.Builder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
@@ -45,9 +49,11 @@ import org.junit.runner.RunWith;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@@ -107,10 +113,10 @@ public class SessionIT {
int i = 0;
while (dataSet.hasNext()) {
RowRecord record = dataSet.next();
- Assert.assertEquals(i, record.getFields().get(0).getLongV());
+ assertEquals(i, record.getFields().get(0).getLongV());
Assert.assertNull(record.getFields().get(1).getDataType());
Assert.assertNull(record.getFields().get(2).getDataType());
- Assert.assertEquals(i, record.getFields().get(3).getDoubleV(),
0.00001);
+ assertEquals(i, record.getFields().get(3).getDoubleV(), 0.00001);
i++;
}
@@ -147,7 +153,7 @@ public class SessionIT {
}
try (SessionDataSet dataSet = session.executeQueryStatement("select *
from root.db.d1")) {
HashSet<String> columnNames = new HashSet<>(dataSet.getColumnNames());
- Assert.assertEquals(5, columnNames.size());
+ assertEquals(5, columnNames.size());
for (int i = 0; i < 4; i++) {
Assert.assertTrue(columnNames.contains(deviceId + "." +
measurements.get(i)));
}
@@ -155,22 +161,22 @@ public class SessionIT {
int row = 10;
while (dataSet.hasNext()) {
RowRecord record = dataSet.next();
- Assert.assertEquals(row, record.getTimestamp());
+ assertEquals(row, record.getTimestamp());
List<Field> fields = record.getFields();
- Assert.assertEquals(4, fields.size());
+ assertEquals(4, fields.size());
for (int i = 0; i < 4; i++) {
switch (fields.get(i).getDataType()) {
case DATE:
- Assert.assertEquals(LocalDate.of(2024, 1, row),
fields.get(i).getDateV());
+ assertEquals(LocalDate.of(2024, 1, row),
fields.get(i).getDateV());
break;
case TIMESTAMP:
- Assert.assertEquals(row, fields.get(i).getLongV());
+ assertEquals(row, fields.get(i).getLongV());
break;
case BLOB:
Assert.assertArrayEquals(bytes,
fields.get(i).getBinaryV().getValues());
break;
case STRING:
- Assert.assertEquals("" + row,
fields.get(i).getBinaryV().toString());
+ assertEquals("" + row, fields.get(i).getBinaryV().toString());
break;
default:
fail("Unsupported data type");
@@ -179,7 +185,7 @@ public class SessionIT {
}
row++;
}
- Assert.assertEquals(20, row);
+ assertEquals(20, row);
}
} catch (Exception e) {
@@ -215,7 +221,7 @@ public class SessionIT {
}
try (SessionDataSet dataSet = session.executeQueryStatement("select *
from root.db.d1")) {
HashSet<String> columnNames = new HashSet<>(dataSet.getColumnNames());
- Assert.assertEquals(5, columnNames.size());
+ assertEquals(5, columnNames.size());
for (int i = 0; i < 4; i++) {
Assert.assertTrue(columnNames.contains(deviceId + "." +
measurements.get(i)));
}
@@ -224,22 +230,22 @@ public class SessionIT {
while (dataSet.hasNext()) {
RowRecord record = dataSet.next();
System.out.println(record);
- Assert.assertEquals(row, record.getTimestamp());
+ assertEquals(row, record.getTimestamp());
List<Field> fields = record.getFields();
- Assert.assertEquals(4, fields.size());
+ assertEquals(4, fields.size());
for (int i = 0; i < 4; i++) {
switch (fields.get(i).getDataType()) {
case DATE:
- Assert.assertEquals(LocalDate.of(2024, 1, row),
fields.get(i).getDateV());
+ assertEquals(LocalDate.of(2024, 1, row),
fields.get(i).getDateV());
break;
case TIMESTAMP:
- Assert.assertEquals(row, fields.get(i).getLongV());
+ assertEquals(row, fields.get(i).getLongV());
break;
case BLOB:
Assert.assertArrayEquals(bytes,
fields.get(i).getBinaryV().getValues());
break;
case STRING:
- Assert.assertEquals("" + row,
fields.get(i).getBinaryV().toString());
+ assertEquals("" + row, fields.get(i).getBinaryV().toString());
break;
default:
fail("Unsupported data type");
@@ -248,7 +254,7 @@ public class SessionIT {
}
row++;
}
- Assert.assertEquals(20, row);
+ assertEquals(20, row);
}
} catch (Exception e) {
@@ -295,7 +301,7 @@ public class SessionIT {
tablet.reset();
try (SessionDataSet dataSet = session.executeQueryStatement("select *
from root.db.d1")) {
HashSet<String> columnNames = new HashSet<>(dataSet.getColumnNames());
- Assert.assertEquals(5, columnNames.size());
+ assertEquals(5, columnNames.size());
for (int i = 0; i < 4; i++) {
Assert.assertTrue(
columnNames.contains(deviceId + "." +
schemaList.get(i).getMeasurementName()));
@@ -304,22 +310,22 @@ public class SessionIT {
int row = 10;
while (dataSet.hasNext()) {
RowRecord record = dataSet.next();
- Assert.assertEquals(row, record.getTimestamp());
+ assertEquals(row, record.getTimestamp());
List<Field> fields = record.getFields();
- Assert.assertEquals(4, fields.size());
+ assertEquals(4, fields.size());
for (int i = 0; i < 4; i++) {
switch (fields.get(i).getDataType()) {
case DATE:
- Assert.assertEquals(LocalDate.of(2024, 1, row),
fields.get(i).getDateV());
+ assertEquals(LocalDate.of(2024, 1, row),
fields.get(i).getDateV());
break;
case TIMESTAMP:
- Assert.assertEquals(row, fields.get(i).getLongV());
+ assertEquals(row, fields.get(i).getLongV());
break;
case BLOB:
Assert.assertArrayEquals(bytes,
fields.get(i).getBinaryV().getValues());
break;
case STRING:
- Assert.assertEquals("" + row,
fields.get(i).getBinaryV().toString());
+ assertEquals("" + row, fields.get(i).getBinaryV().toString());
break;
default:
fail("Unsupported data type");
@@ -328,11 +334,92 @@ public class SessionIT {
}
row++;
}
- Assert.assertEquals(20, row);
+ assertEquals(20, row);
}
} catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
+
+ @Test
+ public void testSessionMisuse() throws StatementExecutionException,
IoTDBConnectionException {
+ final DataNodeWrapper dataNode =
EnvFactory.getEnv().getDataNodeWrapperList().get(0);
+ final Session session = new
Builder().host(dataNode.getIp()).port(dataNode.getPort()).build();
+ // operate before open
+ try {
+ session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1)
VALUES (1,1)");
+ } catch (IoTDBConnectionException e) {
+ assertEquals("Session is not open, please invoke Session.open() first",
e.getMessage());
+ }
+
+ try (SessionDataSet ignored = session.executeQueryStatement("SELECT * FROM
root.**")) {
+ } catch (IoTDBConnectionException e) {
+ assertEquals("Session is not open, please invoke Session.open() first",
e.getMessage());
+ }
+
+ try {
+ session.deleteData("root.ab", 100);
+ } catch (IoTDBConnectionException e) {
+ assertEquals("Session is not open, please invoke Session.open() first",
e.getMessage());
+ }
+
+ try {
+ session.insertTablet(new Tablet("root.db1.d1", Collections.emptyList()));
+ } catch (IoTDBConnectionException e) {
+ assertEquals("Session is not open, please invoke Session.open() first",
e.getMessage());
+ }
+
+ try {
+ session.deleteDatabase("root.db");
+ } catch (IoTDBConnectionException e) {
+ assertEquals("Session is not open, please invoke Session.open() first",
e.getMessage());
+ }
+
+ // close before open
+ try {
+ session.close();
+ } catch (IoTDBConnectionException e) {
+ assertEquals("Session is not open, please invoke Session.open() first",
e.getMessage());
+ }
+
+ // operate after close
+ session.open();
+ session.close();
+
+ try {
+ session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1)
VALUES (1,1)");
+ } catch (IoTDBConnectionException e) {
+ assertEquals("Session is not open, please invoke Session.open() first",
e.getMessage());
+ }
+
+ try (SessionDataSet ignored = session.executeQueryStatement("SELECT * FROM
root.**")) {
+ } catch (IoTDBConnectionException e) {
+ assertEquals("Session is not open, please invoke Session.open() first",
e.getMessage());
+ }
+
+ try {
+ session.deleteData("root.ab", 100);
+ } catch (IoTDBConnectionException e) {
+ assertEquals("Session is not open, please invoke Session.open() first",
e.getMessage());
+ }
+
+ try {
+ session.insertTablet(
+ new Tablet(
+ "root.db1.d1",
+ Collections.singletonList(new MeasurementSchema("s1",
TSDataType.INT64))));
+ } catch (IoTDBConnectionException e) {
+ assertEquals("Session is not open, please invoke Session.open() first",
e.getMessage());
+ }
+
+ try {
+ session.deleteDatabase("root.db");
+ } catch (IoTDBConnectionException e) {
+ assertEquals("Session is not open, please invoke Session.open() first",
e.getMessage());
+ }
+
+ // double close is okay
+ session.close();
+ }
}
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 3d6cc6f3754..70d4266d6c8 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -76,11 +76,11 @@ public interface ISession extends AutoCloseable {
void close() throws IoTDBConnectionException;
- String getTimeZone();
+ String getTimeZone() throws IoTDBConnectionException;
void setTimeZone(String zoneId) throws StatementExecutionException,
IoTDBConnectionException;
- void setTimeZoneOfSession(String zoneId);
+ void setTimeZoneOfSession(String zoneId) throws IoTDBConnectionException;
/**
* @deprecated Use {@link #createDatabase(String)} instead.
@@ -244,7 +244,7 @@ public interface ISession extends AutoCloseable {
void insertRecord(String deviceId, long time, List<String> measurements,
List<String> values)
throws IoTDBConnectionException, StatementExecutionException;
- String getTimestampPrecision() throws TException;
+ String getTimestampPrecision() throws TException, IoTDBConnectionException;
void insertAlignedRecord(
String deviceId, long time, List<String> measurements, List<String>
values)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index dc54d5fd772..e1453c8d29d 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -538,14 +538,14 @@ public class Session implements ISession {
this.enableRPCCompression = enableRPCCompression;
this.connectionTimeoutInMs = connectionTimeoutInMs;
- defaultSessionConnection = constructSessionConnection(this,
defaultEndPoint, zoneId);
- defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+ setDefaultSessionConnection(constructSessionConnection(this,
defaultEndPoint, zoneId));
+ getDefaultSessionConnection().setEnableRedirect(enableQueryRedirection);
isClosed = false;
if (enableRedirection || enableQueryRedirection) {
deviceIdToEndpoint = new ConcurrentHashMap<>();
tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
endPointToSessionConnection = new ConcurrentHashMap<>();
- endPointToSessionConnection.put(defaultEndPoint,
defaultSessionConnection);
+ endPointToSessionConnection.put(defaultEndPoint,
getDefaultSessionConnection());
}
}
@@ -588,14 +588,14 @@ public class Session implements ISession {
this.availableNodes = nodesSupplier;
this.enableRPCCompression = enableRPCCompression;
this.connectionTimeoutInMs = connectionTimeoutInMs;
- defaultSessionConnection = constructSessionConnection(this,
defaultEndPoint, zoneId);
- defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+ setDefaultSessionConnection(constructSessionConnection(this,
defaultEndPoint, zoneId));
+ getDefaultSessionConnection().setEnableRedirect(enableQueryRedirection);
isClosed = false;
if (enableRedirection || enableQueryRedirection) {
this.deviceIdToEndpoint = deviceIdToEndpoint;
this.tableModelDeviceIdToEndpoint = new ConcurrentHashMap<>();
endPointToSessionConnection = new ConcurrentHashMap<>();
- endPointToSessionConnection.put(defaultEndPoint,
defaultSessionConnection);
+ endPointToSessionConnection.put(defaultEndPoint,
getDefaultSessionConnection());
}
}
@@ -614,14 +614,14 @@ public class Session implements ISession {
this.availableNodes = nodesSupplier;
this.enableRPCCompression = enableRPCCompression;
this.connectionTimeoutInMs = connectionTimeoutInMs;
- defaultSessionConnection = constructSessionConnection(this,
defaultEndPoint, zoneId);
- defaultSessionConnection.setEnableRedirect(enableQueryRedirection);
+ setDefaultSessionConnection(constructSessionConnection(this,
defaultEndPoint, zoneId));
+ getDefaultSessionConnection().setEnableRedirect(enableQueryRedirection);
isClosed = false;
if (enableRedirection || enableQueryRedirection) {
this.deviceIdToEndpoint = deviceIdToEndpoint;
this.tableModelDeviceIdToEndpoint = tableModelDeviceIdToEndpoint;
endPointToSessionConnection = new ConcurrentHashMap<>();
- endPointToSessionConnection.put(defaultEndPoint,
defaultSessionConnection);
+ endPointToSessionConnection.put(defaultEndPoint,
getDefaultSessionConnection());
}
}
@@ -635,9 +635,10 @@ public class Session implements ISession {
for (SessionConnection sessionConnection :
endPointToSessionConnection.values()) {
sessionConnection.close();
}
- } else {
- defaultSessionConnection.close();
+ endPointToSessionConnection.clear();
}
+ getDefaultSessionConnection().close();
+ setDefaultSessionConnection(null);
} finally {
// if executorService is null, it means that availableNodes is got from
SessionPool and we
// shouldn't clean that
@@ -669,58 +670,58 @@ public class Session implements ISession {
}
@Override
- public synchronized String getTimeZone() {
- return defaultSessionConnection.getTimeZone();
+ public synchronized String getTimeZone() throws IoTDBConnectionException {
+ return getDefaultSessionConnection().getTimeZone();
}
@Override
public synchronized void setTimeZone(String zoneId)
throws StatementExecutionException, IoTDBConnectionException {
- defaultSessionConnection.setTimeZone(zoneId);
+ getDefaultSessionConnection().setTimeZone(zoneId);
this.zoneId = ZoneId.of(zoneId);
}
/** Only changes the member variable of the Session object without sending
it to server. */
@Override
- public void setTimeZoneOfSession(String zoneId) {
- defaultSessionConnection.setTimeZoneOfSession(zoneId);
+ public void setTimeZoneOfSession(String zoneId) throws
IoTDBConnectionException {
+ getDefaultSessionConnection().setTimeZoneOfSession(zoneId);
this.zoneId = ZoneId.of(zoneId);
}
@Override
public void setStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
- defaultSessionConnection.setStorageGroup(storageGroup);
+ getDefaultSessionConnection().setStorageGroup(storageGroup);
}
@Override
public void deleteStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
-
defaultSessionConnection.deleteStorageGroups(Collections.singletonList(storageGroup));
+
getDefaultSessionConnection().deleteStorageGroups(Collections.singletonList(storageGroup));
}
@Override
public void deleteStorageGroups(List<String> storageGroups)
throws IoTDBConnectionException, StatementExecutionException {
- defaultSessionConnection.deleteStorageGroups(storageGroups);
+ getDefaultSessionConnection().deleteStorageGroups(storageGroups);
}
@Override
public void createDatabase(String database)
throws IoTDBConnectionException, StatementExecutionException {
- defaultSessionConnection.setStorageGroup(database);
+ getDefaultSessionConnection().setStorageGroup(database);
}
@Override
public void deleteDatabase(String database)
throws IoTDBConnectionException, StatementExecutionException {
-
defaultSessionConnection.deleteStorageGroups(Collections.singletonList(database));
+
getDefaultSessionConnection().deleteStorageGroups(Collections.singletonList(database));
}
@Override
public void deleteDatabases(List<String> databases)
throws IoTDBConnectionException, StatementExecutionException {
- defaultSessionConnection.deleteStorageGroups(databases);
+ getDefaultSessionConnection().deleteStorageGroups(databases);
}
@Override
@@ -729,7 +730,7 @@ public class Session implements ISession {
throws IoTDBConnectionException, StatementExecutionException {
TSCreateTimeseriesReq request =
genTSCreateTimeseriesReq(path, dataType, encoding, compressor, null,
null, null, null);
- defaultSessionConnection.createTimeseries(request);
+ getDefaultSessionConnection().createTimeseries(request);
}
@Override
@@ -746,7 +747,7 @@ public class Session implements ISession {
TSCreateTimeseriesReq request =
genTSCreateTimeseriesReq(
path, dataType, encoding, compressor, props, tags, attributes,
measurementAlias);
- defaultSessionConnection.createTimeseries(request);
+ getDefaultSessionConnection().createTimeseries(request);
}
private TSCreateTimeseriesReq genTSCreateTimeseriesReq(
@@ -789,7 +790,7 @@ public class Session implements ISession {
measurementAliasList,
null,
null);
- defaultSessionConnection.createAlignedTimeseries(request);
+ getDefaultSessionConnection().createAlignedTimeseries(request);
}
@Override
@@ -813,7 +814,7 @@ public class Session implements ISession {
measurementAliasList,
tagsList,
attributesList);
- defaultSessionConnection.createAlignedTimeseries(request);
+ getDefaultSessionConnection().createAlignedTimeseries(request);
}
private TSCreateAlignedTimeseriesReq getTSCreateAlignedTimeseriesReq(
@@ -859,7 +860,7 @@ public class Session implements ISession {
tagsList,
attributesList,
measurementAliasList);
- defaultSessionConnection.createMultiTimeseries(request);
+ getDefaultSessionConnection().createMultiTimeseries(request);
}
private TSCreateMultiTimeseriesReq genTSCreateMultiTimeseriesReq(
@@ -904,7 +905,7 @@ public class Session implements ISession {
@Override
public boolean checkTimeseriesExists(String path)
throws IoTDBConnectionException, StatementExecutionException {
- return defaultSessionConnection.checkTimeseriesExists(path,
queryTimeoutInMs);
+ return getDefaultSessionConnection().checkTimeseriesExists(path,
queryTimeoutInMs);
}
@Override
@@ -960,7 +961,7 @@ public class Session implements ISession {
if (enableQueryRedirection) {
// retry
try {
- return defaultSessionConnection.executeQueryStatement(sql,
queryTimeoutInMs);
+ return getDefaultSessionConnection().executeQueryStatement(sql,
queryTimeoutInMs);
} catch (RedirectException redirectException) {
logger.error("{} redirect twice", sql, redirectException);
throw new StatementExecutionException(sql + " redirect twice, please
try again.");
@@ -971,11 +972,11 @@ public class Session implements ISession {
}
}
- private SessionConnection getQuerySessionConnection() {
+ private SessionConnection getQuerySessionConnection() throws
IoTDBConnectionException {
Optional<TEndPoint> endPoint =
availableNodes == null ? Optional.empty() :
availableNodes.getQueryEndPoint();
if (!endPoint.isPresent() || endPointToSessionConnection == null) {
- return defaultSessionConnection;
+ return getDefaultSessionConnection();
}
SessionConnection connection =
endPointToSessionConnection.computeIfAbsent(
@@ -987,7 +988,7 @@ public class Session implements ISession {
return null;
}
});
- return connection == null ? defaultSessionConnection : connection;
+ return connection == null ? getDefaultSessionConnection() : connection;
}
/**
@@ -1000,7 +1001,7 @@ public class Session implements ISession {
throws IoTDBConnectionException, StatementExecutionException {
String previousDB = database;
String previousDialect = sqlDialect;
- defaultSessionConnection.executeNonQueryStatement(sql);
+ getDefaultSessionConnection().executeNonQueryStatement(sql);
if ((!Objects.equals(previousDB, database) ||
!Objects.equals(previousDialect, sqlDialect))
&& endPointToSessionConnection != null) {
Iterator<Map.Entry<TEndPoint, SessionConnection>> iterator =
@@ -1008,7 +1009,7 @@ public class Session implements ISession {
while (iterator.hasNext()) {
Map.Entry<TEndPoint, SessionConnection> entry = iterator.next();
SessionConnection sessionConnection = entry.getValue();
- if (sessionConnection != defaultSessionConnection) {
+ if (sessionConnection != getDefaultSessionConnection()) {
try {
sessionConnection.executeNonQueryStatement(sql);
} catch (Throwable t) {
@@ -1036,13 +1037,14 @@ public class Session implements ISession {
List<String> paths, long startTime, long endTime, long timeOut)
throws StatementExecutionException, IoTDBConnectionException {
try {
- return defaultSessionConnection.executeRawDataQuery(paths, startTime,
endTime, timeOut);
+ return getDefaultSessionConnection().executeRawDataQuery(paths,
startTime, endTime, timeOut);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
// retry
try {
- return defaultSessionConnection.executeRawDataQuery(paths,
startTime, endTime, timeOut);
+ return getDefaultSessionConnection()
+ .executeRawDataQuery(paths, startTime, endTime, timeOut);
} catch (RedirectException redirectException) {
logger.error(REDIRECT_TWICE, redirectException);
throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
@@ -1076,13 +1078,13 @@ public class Session implements ISession {
public SessionDataSet executeLastDataQuery(List<String> paths, long
lastTime, long timeOut)
throws StatementExecutionException, IoTDBConnectionException {
try {
- return defaultSessionConnection.executeLastDataQuery(paths, lastTime,
timeOut);
+ return getDefaultSessionConnection().executeLastDataQuery(paths,
lastTime, timeOut);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
// retry
try {
- return defaultSessionConnection.executeLastDataQuery(paths,
lastTime, timeOut);
+ return getDefaultSessionConnection().executeLastDataQuery(paths,
lastTime, timeOut);
} catch (RedirectException redirectException) {
logger.error(REDIRECT_TWICE, redirectException);
throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
@@ -1108,7 +1110,7 @@ public class Session implements ISession {
@Override
public SessionDataSet executeFastLastDataQueryForOnePrefixPath(final
List<String> prefixes)
throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
- return
defaultSessionConnection.executeLastDataQueryForOnePrefixPath(prefixes);
+ return
getDefaultSessionConnection().executeLastDataQueryForOnePrefixPath(prefixes);
}
@Override
@@ -1127,13 +1129,15 @@ public class Session implements ISession {
return pair.left;
} catch (IoTDBConnectionException e) {
if (enableRedirection
+ && deviceIdToEndpoint != null
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(device) != null) {
logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(device));
deviceIdToEndpoint.remove(device);
// reconnect with default connection
- return defaultSessionConnection.executeLastDataQueryForOneDevice(
+ return getDefaultSessionConnection()
+ .executeLastDataQueryForOneDevice(
db, device, sensors, isLegalPathNodes, queryTimeoutInMs)
.left;
} else {
@@ -1147,13 +1151,13 @@ public class Session implements ISession {
List<String> paths, List<TAggregationType> aggregations)
throws StatementExecutionException, IoTDBConnectionException {
try {
- return defaultSessionConnection.executeAggregationQuery(paths,
aggregations);
+ return getDefaultSessionConnection().executeAggregationQuery(paths,
aggregations);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
// retry
try {
- return defaultSessionConnection.executeAggregationQuery(paths,
aggregations);
+ return getDefaultSessionConnection().executeAggregationQuery(paths,
aggregations);
} catch (RedirectException redirectException) {
logger.error(REDIRECT_TWICE, redirectException);
throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
@@ -1169,15 +1173,15 @@ public class Session implements ISession {
List<String> paths, List<TAggregationType> aggregations, long startTime,
long endTime)
throws StatementExecutionException, IoTDBConnectionException {
try {
- return defaultSessionConnection.executeAggregationQuery(
- paths, aggregations, startTime, endTime);
+ return getDefaultSessionConnection()
+ .executeAggregationQuery(paths, aggregations, startTime, endTime);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
// retry
try {
- return defaultSessionConnection.executeAggregationQuery(
- paths, aggregations, startTime, endTime);
+ return getDefaultSessionConnection()
+ .executeAggregationQuery(paths, aggregations, startTime,
endTime);
} catch (RedirectException redirectException) {
logger.error(REDIRECT_TWICE, redirectException);
throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
@@ -1197,15 +1201,15 @@ public class Session implements ISession {
long interval)
throws StatementExecutionException, IoTDBConnectionException {
try {
- return defaultSessionConnection.executeAggregationQuery(
- paths, aggregations, startTime, endTime, interval);
+ return getDefaultSessionConnection()
+ .executeAggregationQuery(paths, aggregations, startTime, endTime,
interval);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
// retry
try {
- return defaultSessionConnection.executeAggregationQuery(
- paths, aggregations, startTime, endTime, interval);
+ return getDefaultSessionConnection()
+ .executeAggregationQuery(paths, aggregations, startTime,
endTime, interval);
} catch (RedirectException redirectException) {
logger.error(REDIRECT_TWICE, redirectException);
throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
@@ -1226,15 +1230,16 @@ public class Session implements ISession {
long slidingStep)
throws StatementExecutionException, IoTDBConnectionException {
try {
- return defaultSessionConnection.executeAggregationQuery(
- paths, aggregations, startTime, endTime, interval, slidingStep);
+ return getDefaultSessionConnection()
+ .executeAggregationQuery(paths, aggregations, startTime, endTime,
interval, slidingStep);
} catch (RedirectException e) {
handleQueryRedirection(e.getEndPoint());
if (enableQueryRedirection) {
// retry
try {
- return defaultSessionConnection.executeAggregationQuery(
- paths, aggregations, startTime, endTime, interval, slidingStep);
+ return getDefaultSessionConnection()
+ .executeAggregationQuery(
+ paths, aggregations, startTime, endTime, interval,
slidingStep);
} catch (RedirectException redirectException) {
logger.error(REDIRECT_TWICE, redirectException);
throw new StatementExecutionException(REDIRECT_TWICE_RETRY);
@@ -1288,7 +1293,7 @@ public class Session implements ISession {
// reconnect with default connection
try {
- defaultSessionConnection.insertRecord(request);
+ getDefaultSessionConnection().insertRecord(request);
} catch (RedirectException ignored) {
}
} else {
@@ -1312,7 +1317,7 @@ public class Session implements ISession {
// reconnect with default connection
try {
- defaultSessionConnection.insertRecord(request);
+ getDefaultSessionConnection().insertRecord(request);
} catch (RedirectException ignored) {
}
} else {
@@ -1321,19 +1326,21 @@ public class Session implements ISession {
}
}
- private SessionConnection getSessionConnection(String deviceId) {
+ private SessionConnection getSessionConnection(String deviceId) throws
IoTDBConnectionException {
TEndPoint endPoint;
if (enableRedirection
+ && deviceIdToEndpoint != null
&& !deviceIdToEndpoint.isEmpty()
&& (endPoint = deviceIdToEndpoint.get(deviceId)) != null
&& endPointToSessionConnection.containsKey(endPoint)) {
return endPointToSessionConnection.get(endPoint);
} else {
- return defaultSessionConnection;
+ return getDefaultSessionConnection();
}
}
- private SessionConnection getSessionConnection(IDeviceID deviceId) {
+ private SessionConnection getSessionConnection(IDeviceID deviceId)
+ throws IoTDBConnectionException {
TEndPoint endPoint;
if (enableRedirection
&& tableModelDeviceIdToEndpoint != null
@@ -1341,13 +1348,13 @@ public class Session implements ISession {
&& endPointToSessionConnection.containsKey(endPoint)) {
return endPointToSessionConnection.get(endPoint);
} else {
- return defaultSessionConnection;
+ return getDefaultSessionConnection();
}
}
@Override
- public String getTimestampPrecision() throws TException {
- return
defaultSessionConnection.getClient().getProperties().getTimestampPrecision();
+ public String getTimestampPrecision() throws TException,
IoTDBConnectionException {
+ return
getDefaultSessionConnection().getClient().getProperties().getTimestampPrecision();
}
// TODO https://issues.apache.org/jira/browse/IOTDB-1399
@@ -1461,7 +1468,7 @@ public class Session implements ISession {
if (connection == null) {
throw new IoTDBConnectionException(exceptionReference.get());
}
- defaultSessionConnection = connection;
+ setDefaultSessionConnection(connection);
}
}
@@ -1665,7 +1672,7 @@ public class Session implements ISession {
return;
}
try {
- defaultSessionConnection.insertRecords(request);
+ getDefaultSessionConnection().insertRecords(request);
} catch (RedirectException ignored) {
}
}
@@ -1880,7 +1887,7 @@ public class Session implements ISession {
}
try {
- defaultSessionConnection.insertRecords(request);
+ getDefaultSessionConnection().insertRecords(request);
} catch (RedirectException ignored) {
}
}
@@ -2066,7 +2073,7 @@ public class Session implements ISession {
return;
}
try {
- defaultSessionConnection.insertRecords(request);
+ getDefaultSessionConnection().insertRecords(request);
} catch (RedirectException ignored) {
}
}
@@ -2119,7 +2126,7 @@ public class Session implements ISession {
return;
}
try {
- defaultSessionConnection.insertRecords(request);
+ getDefaultSessionConnection().insertRecords(request);
} catch (RedirectException ignored) {
}
}
@@ -2189,6 +2196,7 @@ public class Session implements ISession {
handleRedirection(deviceId, e.getEndPoint());
} catch (IoTDBConnectionException e) {
if (enableRedirection
+ && deviceIdToEndpoint != null
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(deviceId) != null) {
logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(deviceId));
@@ -2196,7 +2204,7 @@ public class Session implements ISession {
// reconnect with default connection
try {
- defaultSessionConnection.insertRecordsOfOneDevice(request);
+ getDefaultSessionConnection().insertRecordsOfOneDevice(request);
} catch (RedirectException ignored) {
}
} else {
@@ -2242,6 +2250,7 @@ public class Session implements ISession {
handleRedirection(deviceId, e.getEndPoint());
} catch (IoTDBConnectionException e) {
if (enableRedirection
+ && deviceIdToEndpoint != null
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(deviceId) != null) {
logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(deviceId));
@@ -2249,7 +2258,7 @@ public class Session implements ISession {
// reconnect with default connection
try {
- defaultSessionConnection.insertStringRecordsOfOneDevice(req);
+ getDefaultSessionConnection().insertStringRecordsOfOneDevice(req);
} catch (RedirectException ignored) {
}
} else {
@@ -2342,6 +2351,7 @@ public class Session implements ISession {
handleRedirection(deviceId, e.getEndPoint());
} catch (IoTDBConnectionException e) {
if (enableRedirection
+ && deviceIdToEndpoint != null
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(deviceId) != null) {
logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(deviceId));
@@ -2349,7 +2359,7 @@ public class Session implements ISession {
// reconnect with default connection
try {
- defaultSessionConnection.insertRecordsOfOneDevice(request);
+ getDefaultSessionConnection().insertRecordsOfOneDevice(request);
} catch (RedirectException ignored) {
}
} else {
@@ -2395,6 +2405,7 @@ public class Session implements ISession {
handleRedirection(deviceId, e.getEndPoint());
} catch (IoTDBConnectionException e) {
if (enableRedirection
+ && deviceIdToEndpoint != null
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(deviceId) != null) {
logger.warn(SESSION_CANNOT_CONNECT, deviceIdToEndpoint.get(deviceId));
@@ -2402,7 +2413,7 @@ public class Session implements ISession {
// reconnect with default connection
try {
- defaultSessionConnection.insertStringRecordsOfOneDevice(req);
+ getDefaultSessionConnection().insertStringRecordsOfOneDevice(req);
} catch (RedirectException ignored) {
}
} else {
@@ -2718,6 +2729,7 @@ public class Session implements ISession {
handleRedirection(tablet.getDeviceId(), e.getEndPoint());
} catch (IoTDBConnectionException e) {
if (enableRedirection
+ && deviceIdToEndpoint != null
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(tablet.getDeviceId()) != null) {
logger.warn(SESSION_CANNOT_CONNECT,
deviceIdToEndpoint.get(tablet.getDeviceId()));
@@ -2725,7 +2737,7 @@ public class Session implements ISession {
// reconnect with default connection
try {
- defaultSessionConnection.insertTablet(request);
+ getDefaultSessionConnection().insertTablet(request);
} catch (RedirectException ignored) {
}
} else {
@@ -2755,7 +2767,7 @@ public class Session implements ISession {
.map(t -> (byte) t.ordinal())
.collect(Collectors.toList()));
try {
- defaultSessionConnection.insertTablet(request);
+ getDefaultSessionConnection().insertTablet(request);
} catch (RedirectException ignored) {
}
}
@@ -2765,7 +2777,7 @@ public class Session implements ISession {
throws IoTDBConnectionException, StatementExecutionException {
Map<SessionConnection, Tablet> relationalTabletGroup = new HashMap<>();
if (tableModelDeviceIdToEndpoint.isEmpty()) {
- relationalTabletGroup.put(defaultSessionConnection, tablet);
+ relationalTabletGroup.put(getDefaultSessionConnection(), tablet);
} else if (SessionUtils.isTabletContainsSingleDevice(tablet)) {
relationalTabletGroup.put(getSessionConnection(tablet.getDeviceID(0)),
tablet);
} else {
@@ -2837,7 +2849,7 @@ public class Session implements ISession {
// remove the broken session
removeBrokenSessionConnection(connection);
try {
- defaultSessionConnection.insertTablet(request);
+ getDefaultSessionConnection().insertTablet(request);
} catch (RedirectException ignored) {
}
} else {
@@ -2884,7 +2896,7 @@ public class Session implements ISession {
// remove the broken session
removeBrokenSessionConnection(connection);
try {
- insertConsumer.insert(defaultSessionConnection,
request);
+
insertConsumer.insert(getDefaultSessionConnection(), request);
} catch (IoTDBConnectionException |
StatementExecutionException ex) {
throw new CompletionException(ex);
} catch (RedirectException ignored) {
@@ -2949,6 +2961,7 @@ public class Session implements ISession {
handleRedirection(tablet.getDeviceId(), e.getEndPoint());
} catch (IoTDBConnectionException e) {
if (enableRedirection
+ && deviceIdToEndpoint != null
&& !deviceIdToEndpoint.isEmpty()
&& deviceIdToEndpoint.get(tablet.getDeviceId()) != null) {
logger.warn(SESSION_CANNOT_CONNECT,
deviceIdToEndpoint.get(tablet.getDeviceId()));
@@ -2956,7 +2969,7 @@ public class Session implements ISession {
// reconnect with default connection
try {
- defaultSessionConnection.insertTablet(request);
+ getDefaultSessionConnection().insertTablet(request);
} catch (RedirectException ignored) {
}
} else {
@@ -3018,7 +3031,7 @@ public class Session implements ISession {
TSInsertTabletsReq request =
genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted,
false);
try {
- defaultSessionConnection.insertTablets(request);
+ getDefaultSessionConnection().insertTablets(request);
} catch (RedirectException ignored) {
}
}
@@ -3054,7 +3067,7 @@ public class Session implements ISession {
TSInsertTabletsReq request =
genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted,
true);
try {
- defaultSessionConnection.insertTablets(request);
+ getDefaultSessionConnection().insertTablets(request);
} catch (RedirectException ignored) {
}
}
@@ -3328,7 +3341,7 @@ public class Session implements ISession {
public void testInsertTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
- defaultSessionConnection.testInsertTablet(request);
+ getDefaultSessionConnection().testInsertTablet(request);
}
/**
@@ -3350,7 +3363,7 @@ public class Session implements ISession {
throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletsReq request =
genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted,
false);
- defaultSessionConnection.testInsertTablets(request);
+ getDefaultSessionConnection().testInsertTablets(request);
}
/**
@@ -3374,7 +3387,7 @@ public class Session implements ISession {
return;
}
- defaultSessionConnection.testInsertRecords(request);
+ getDefaultSessionConnection().testInsertRecords(request);
}
/**
@@ -3399,7 +3412,7 @@ public class Session implements ISession {
return;
}
- defaultSessionConnection.testInsertRecords(request);
+ getDefaultSessionConnection().testInsertRecords(request);
}
/**
@@ -3417,7 +3430,7 @@ public class Session implements ISession {
logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
return;
}
- defaultSessionConnection.testInsertRecord(request);
+ getDefaultSessionConnection().testInsertRecord(request);
}
/**
@@ -3439,7 +3452,7 @@ public class Session implements ISession {
logger.warn(ALL_VALUES_ARE_NULL, deviceId, time, measurements);
return;
}
- defaultSessionConnection.testInsertRecord(request);
+ getDefaultSessionConnection().testInsertRecord(request);
}
/**
@@ -3450,7 +3463,7 @@ public class Session implements ISession {
@Override
public void deleteTimeseries(String path)
throws IoTDBConnectionException, StatementExecutionException {
- defaultSessionConnection.deleteTimeseries(Collections.singletonList(path));
+
getDefaultSessionConnection().deleteTimeseries(Collections.singletonList(path));
}
/**
@@ -3461,7 +3474,7 @@ public class Session implements ISession {
@Override
public void deleteTimeseries(List<String> paths)
throws IoTDBConnectionException, StatementExecutionException {
- defaultSessionConnection.deleteTimeseries(paths);
+ getDefaultSessionConnection().deleteTimeseries(paths);
}
/**
@@ -3499,7 +3512,7 @@ public class Session implements ISession {
public void deleteData(List<String> paths, long startTime, long endTime)
throws IoTDBConnectionException, StatementExecutionException {
TSDeleteDataReq request = genTSDeleteDataReq(paths, startTime, endTime);
- defaultSessionConnection.deleteData(request);
+ getDefaultSessionConnection().deleteData(request);
}
private TSDeleteDataReq genTSDeleteDataReq(List<String> paths, long
startTime, long endTime) {
@@ -3663,7 +3676,7 @@ public class Session implements ISession {
public void setSchemaTemplate(String templateName, String prefixPath)
throws IoTDBConnectionException, StatementExecutionException {
TSSetSchemaTemplateReq request = getTSSetSchemaTemplateReq(templateName,
prefixPath);
- defaultSessionConnection.setSchemaTemplate(request);
+ getDefaultSessionConnection().setSchemaTemplate(request);
}
/**
@@ -3689,7 +3702,7 @@ public class Session implements ISession {
template.serialize(baos);
req.setSerializedTemplate(baos.toByteArray());
baos.close();
- defaultSessionConnection.createSchemaTemplate(req);
+ getDefaultSessionConnection().createSchemaTemplate(req);
}
/**
@@ -3795,7 +3808,7 @@ public class Session implements ISession {
req.setCompressors(
compressors.stream().map(i -> (int)
i.serialize()).collect(Collectors.toList()));
req.setIsAligned(true);
- defaultSessionConnection.appendSchemaTemplate(req);
+ getDefaultSessionConnection().appendSchemaTemplate(req);
}
/**
@@ -3818,7 +3831,7 @@ public class Session implements ISession {
req.setEncodings(Collections.singletonList(encoding.ordinal()));
req.setCompressors(Collections.singletonList((int)
compressor.serialize()));
req.setIsAligned(true);
- defaultSessionConnection.appendSchemaTemplate(req);
+ getDefaultSessionConnection().appendSchemaTemplate(req);
}
/**
@@ -3841,7 +3854,7 @@ public class Session implements ISession {
req.setCompressors(
compressors.stream().map(i -> (int)
i.serialize()).collect(Collectors.toList()));
req.setIsAligned(false);
- defaultSessionConnection.appendSchemaTemplate(req);
+ getDefaultSessionConnection().appendSchemaTemplate(req);
}
/**
@@ -3863,7 +3876,7 @@ public class Session implements ISession {
req.setEncodings(Collections.singletonList(encoding.ordinal()));
req.setCompressors(Collections.singletonList((int)
compressor.serialize()));
req.setIsAligned(false);
- defaultSessionConnection.appendSchemaTemplate(req);
+ getDefaultSessionConnection().appendSchemaTemplate(req);
}
/**
@@ -3876,7 +3889,7 @@ public class Session implements ISession {
TSPruneSchemaTemplateReq req = new TSPruneSchemaTemplateReq();
req.setName(templateName);
req.setPath(path);
- defaultSessionConnection.pruneSchemaTemplate(req);
+ getDefaultSessionConnection().pruneSchemaTemplate(req);
}
/**
@@ -3888,7 +3901,7 @@ public class Session implements ISession {
TSQueryTemplateReq req = new TSQueryTemplateReq();
req.setName(name);
req.setQueryType(TemplateQueryType.COUNT_MEASUREMENTS.ordinal());
- TSQueryTemplateResp resp =
defaultSessionConnection.querySchemaTemplate(req);
+ TSQueryTemplateResp resp =
getDefaultSessionConnection().querySchemaTemplate(req);
return resp.getCount();
}
@@ -3902,7 +3915,7 @@ public class Session implements ISession {
req.setName(templateName);
req.setQueryType(TemplateQueryType.IS_MEASUREMENT.ordinal());
req.setMeasurement(path);
- TSQueryTemplateResp resp =
defaultSessionConnection.querySchemaTemplate(req);
+ TSQueryTemplateResp resp =
getDefaultSessionConnection().querySchemaTemplate(req);
return resp.result;
}
@@ -3916,7 +3929,7 @@ public class Session implements ISession {
req.setName(templateName);
req.setQueryType(TemplateQueryType.PATH_EXIST.ordinal());
req.setMeasurement(path);
- TSQueryTemplateResp resp =
defaultSessionConnection.querySchemaTemplate(req);
+ TSQueryTemplateResp resp =
getDefaultSessionConnection().querySchemaTemplate(req);
return resp.result;
}
@@ -3930,7 +3943,7 @@ public class Session implements ISession {
req.setName(templateName);
req.setQueryType(TemplateQueryType.SHOW_MEASUREMENTS.ordinal());
req.setMeasurement("");
- TSQueryTemplateResp resp =
defaultSessionConnection.querySchemaTemplate(req);
+ TSQueryTemplateResp resp =
getDefaultSessionConnection().querySchemaTemplate(req);
return resp.getMeasurements();
}
@@ -3944,7 +3957,7 @@ public class Session implements ISession {
req.setName(templateName);
req.setQueryType(TemplateQueryType.SHOW_MEASUREMENTS.ordinal());
req.setMeasurement(pattern);
- TSQueryTemplateResp resp =
defaultSessionConnection.querySchemaTemplate(req);
+ TSQueryTemplateResp resp =
getDefaultSessionConnection().querySchemaTemplate(req);
return resp.getMeasurements();
}
@@ -3957,7 +3970,7 @@ public class Session implements ISession {
TSQueryTemplateReq req = new TSQueryTemplateReq();
req.setName("");
req.setQueryType(TemplateQueryType.SHOW_TEMPLATES.ordinal());
- TSQueryTemplateResp resp =
defaultSessionConnection.querySchemaTemplate(req);
+ TSQueryTemplateResp resp =
getDefaultSessionConnection().querySchemaTemplate(req);
return resp.getMeasurements();
}
@@ -3970,7 +3983,7 @@ public class Session implements ISession {
TSQueryTemplateReq req = new TSQueryTemplateReq();
req.setName(templateName);
req.setQueryType(TemplateQueryType.SHOW_SET_TEMPLATES.ordinal());
- TSQueryTemplateResp resp =
defaultSessionConnection.querySchemaTemplate(req);
+ TSQueryTemplateResp resp =
getDefaultSessionConnection().querySchemaTemplate(req);
return resp.getMeasurements();
}
@@ -3983,7 +3996,7 @@ public class Session implements ISession {
TSQueryTemplateReq req = new TSQueryTemplateReq();
req.setName(templateName);
req.setQueryType(TemplateQueryType.SHOW_USING_TEMPLATES.ordinal());
- TSQueryTemplateResp resp =
defaultSessionConnection.querySchemaTemplate(req);
+ TSQueryTemplateResp resp =
getDefaultSessionConnection().querySchemaTemplate(req);
return resp.getMeasurements();
}
@@ -3991,14 +4004,14 @@ public class Session implements ISession {
public void unsetSchemaTemplate(String prefixPath, String templateName)
throws IoTDBConnectionException, StatementExecutionException {
TSUnsetSchemaTemplateReq request = getTSUnsetSchemaTemplateReq(prefixPath,
templateName);
- defaultSessionConnection.unsetSchemaTemplate(request);
+ getDefaultSessionConnection().unsetSchemaTemplate(request);
}
@Override
public void dropSchemaTemplate(String templateName)
throws IoTDBConnectionException, StatementExecutionException {
TSDropSchemaTemplateReq request = getTSDropSchemaTemplateReq(templateName);
- defaultSessionConnection.dropSchemaTemplate(request);
+ getDefaultSessionConnection().dropSchemaTemplate(request);
}
private TSSetSchemaTemplateReq getTSSetSchemaTemplateReq(String
templateName, String prefixPath) {
@@ -4036,7 +4049,7 @@ public class Session implements ISession {
}
TCreateTimeseriesUsingSchemaTemplateReq request = new
TCreateTimeseriesUsingSchemaTemplateReq();
request.setDevicePathList(devicePathList);
- defaultSessionConnection.createTimeseriesUsingSchemaTemplate(request);
+ getDefaultSessionConnection().createTimeseriesUsingSchemaTemplate(request);
}
private <T> void insertOnce(
@@ -4054,7 +4067,7 @@ public class Session implements ISession {
// remove the broken session
removeBrokenSessionConnection(connection);
try {
- insertConsumer.insert(defaultSessionConnection, insertReq);
+ insertConsumer.insert(getDefaultSessionConnection(), insertReq);
} catch (RedirectException ignored) {
}
} else {
@@ -4097,7 +4110,7 @@ public class Session implements ISession {
// remove the broken session
removeBrokenSessionConnection(connection);
try {
- insertConsumer.insert(defaultSessionConnection,
insertReq);
+
insertConsumer.insert(getDefaultSessionConnection(), insertReq);
} catch (IoTDBConnectionException |
StatementExecutionException ex) {
throw new CompletionException(ex);
} catch (RedirectException ignored) {
@@ -4153,12 +4166,12 @@ public class Session implements ISession {
@Override
public TSBackupConfigurationResp getBackupConfiguration()
throws IoTDBConnectionException, StatementExecutionException {
- return defaultSessionConnection.getBackupConfiguration();
+ return getDefaultSessionConnection().getBackupConfiguration();
}
@Override
public TSConnectionInfoResp fetchAllConnections() throws
IoTDBConnectionException {
- return defaultSessionConnection.fetchAllConnections();
+ return getDefaultSessionConnection().fetchAllConnections();
}
protected void changeDatabase(String database) {
@@ -4179,6 +4192,17 @@ public class Session implements ISession {
return sqlDialect;
}
+ protected SessionConnection getDefaultSessionConnection() throws
IoTDBConnectionException {
+ if (defaultSessionConnection == null) {
+ throw new IoTDBConnectionException("Session is not open, please invoke
Session.open() first");
+ }
+ return defaultSessionConnection;
+ }
+
+ protected void setDefaultSessionConnection(SessionConnection
defaultSessionConnection) {
+ this.defaultSessionConnection = defaultSessionConnection;
+ }
+
public static class Builder extends AbstractSessionBuilder {
public Builder host(String host) {
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 68947283ce2..477eba234eb 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -275,6 +275,10 @@ public class SessionConnection {
}
public void close() throws IoTDBConnectionException {
+ if (!transport.isOpen()) {
+ return;
+ }
+
TSCloseSessionReq req = new TSCloseSessionReq(sessionId);
try {
client.closeSession(req);
@@ -1090,7 +1094,7 @@ public class SessionConnection {
// remove the broken end point
session.removeBrokenSessionConnection(this);
session.defaultEndPoint = this.endPoint;
- session.defaultSessionConnection = this;
+ session.setDefaultSessionConnection(this);
if (session.endPointToSessionConnection == null) {
session.endPointToSessionConnection = new ConcurrentHashMap<>();
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AbstractSubscriptionSession.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AbstractSubscriptionSession.java
index 5ab42f29d70..6683a1c5adf 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AbstractSubscriptionSession.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/AbstractSubscriptionSession.java
@@ -46,7 +46,7 @@ abstract class AbstractSubscriptionSession {
this.session = session;
}
- public SubscriptionSessionConnection getSessionConnection() {
+ public SubscriptionSessionConnection getSessionConnection() throws
IoTDBConnectionException {
return session.getSessionConnection();
}
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java
index 455e9082ee6..ec5a4d6cc99 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/SubscriptionSessionWrapper.java
@@ -62,8 +62,8 @@ public final class SubscriptionSessionWrapper extends Session
{
database);
}
- public SubscriptionSessionConnection getSessionConnection() {
- return (SubscriptionSessionConnection) defaultSessionConnection;
+ public SubscriptionSessionConnection getSessionConnection() throws
IoTDBConnectionException {
+ return (SubscriptionSessionConnection) getDefaultSessionConnection();
}
public int getThriftMaxFrameSize() {
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
index df5dfcfd7f7..4a9395a1b0b 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionProvider.java
@@ -116,7 +116,7 @@ public abstract class AbstractSubscriptionProvider {
this.password = password;
}
- SubscriptionSessionConnection getSessionConnection() {
+ SubscriptionSessionConnection getSessionConnection() throws
IoTDBConnectionException {
return session.getSessionConnection();
}
@@ -191,10 +191,10 @@ public abstract class AbstractSubscriptionProvider {
final TPipeSubscribeResp resp;
try {
resp = getSessionConnection().pipeSubscribe(req);
- } catch (final TException e) {
+ } catch (final TException | IoTDBConnectionException e) {
// Assume provider unavailable
LOGGER.warn(
- "TException occurred when SubscriptionProvider {} handshake with
request {}, set SubscriptionProvider unavailable",
+ "TException/IoTDBConnectionException occurred when
SubscriptionProvider {} handshake with request {}, set SubscriptionProvider
unavailable",
this,
consumerConfig,
e);
@@ -223,10 +223,10 @@ public abstract class AbstractSubscriptionProvider {
final TPipeSubscribeResp resp;
try {
resp =
getSessionConnection().pipeSubscribe(PipeSubscribeCloseReq.toTPipeSubscribeReq());
- } catch (final TException e) {
+ } catch (final TException | IoTDBConnectionException e) {
// Assume provider unavailable
LOGGER.warn(
- "TException occurred when SubscriptionProvider {} close, set
SubscriptionProvider unavailable",
+ "TException/IoTDBConnectionException occurred when
SubscriptionProvider {} close, set SubscriptionProvider unavailable",
this,
e);
setUnavailable();
@@ -241,10 +241,10 @@ public abstract class AbstractSubscriptionProvider {
final TPipeSubscribeResp resp;
try {
resp =
getSessionConnection().pipeSubscribe(PipeSubscribeHeartbeatReq.toTPipeSubscribeReq());
- } catch (final TException e) {
+ } catch (final TException | IoTDBConnectionException e) {
// Assume provider unavailable
LOGGER.warn(
- "TException occurred when SubscriptionProvider {} heartbeat, set
SubscriptionProvider unavailable",
+ "TException/IoTDBConnectionException occurred when
SubscriptionProvider {} heartbeat, set SubscriptionProvider unavailable",
this,
e);
setUnavailable();
@@ -269,10 +269,10 @@ public abstract class AbstractSubscriptionProvider {
final TPipeSubscribeResp resp;
try {
resp = getSessionConnection().pipeSubscribe(req);
- } catch (final TException e) {
+ } catch (final TException | IoTDBConnectionException e) {
// Assume provider unavailable
LOGGER.warn(
- "TException occurred when SubscriptionProvider {} subscribe with
request {}, set SubscriptionProvider unavailable",
+ "TException/IoTDBConnectionException occurred when
SubscriptionProvider {} subscribe with request {}, set SubscriptionProvider
unavailable",
this,
topicNames,
e);
@@ -300,10 +300,10 @@ public abstract class AbstractSubscriptionProvider {
final TPipeSubscribeResp resp;
try {
resp = getSessionConnection().pipeSubscribe(req);
- } catch (final TException e) {
+ } catch (final TException | IoTDBConnectionException e) {
// Assume provider unavailable
LOGGER.warn(
- "TException occurred when SubscriptionProvider {} unsubscribe with
request {}, set SubscriptionProvider unavailable",
+ "TException/IoTDBConnectionException occurred when
SubscriptionProvider {} unsubscribe with request {}, set SubscriptionProvider
unavailable",
this,
topicNames,
e);
@@ -364,10 +364,10 @@ public abstract class AbstractSubscriptionProvider {
final TPipeSubscribeResp resp;
try {
resp = getSessionConnection().pipeSubscribe(req);
- } catch (final TException e) {
+ } catch (final TException | IoTDBConnectionException e) {
// Assume provider unavailable
LOGGER.warn(
- "TException occurred when SubscriptionProvider {} poll with request
{}, set SubscriptionProvider unavailable",
+ "TException/IoTDBConnectionException occurred when
SubscriptionProvider {} poll with request {}, set SubscriptionProvider
unavailable",
this,
pollMessage,
e);
@@ -395,10 +395,10 @@ public abstract class AbstractSubscriptionProvider {
final TPipeSubscribeResp resp;
try {
resp = getSessionConnection().pipeSubscribe(req);
- } catch (final TException e) {
+ } catch (final TException | IoTDBConnectionException e) {
// Assume provider unavailable
LOGGER.warn(
- "TException occurred when SubscriptionProvider {} commit with
request {}, set SubscriptionProvider unavailable",
+ "TException/IoTDBConnectionException occurred when
SubscriptionProvider {} commit with request {}, set SubscriptionProvider
unavailable",
this,
subscriptionCommitContexts,
e);