This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 72e69aee1f9 [Refactor] Reduce redundancy by introducing consistent
functional interface in SessionConnection (#14212)
72e69aee1f9 is described below
commit 72e69aee1f933f4019ac8e01d3c563fafea57990
Author: William Song <[email protected]>
AuthorDate: Mon Dec 2 21:16:17 2024 +0800
[Refactor] Reduce redundancy by introducing consistent functional interface
in SessionConnection (#14212)
* refactor session connect to reduce redundency
* refactor session connect to reduce redundency
* mvn spotless
* fix small error
---
.../apache/iotdb/session/SessionConnection.java | 374 ++++++---------------
.../apache/iotdb/session/util/CheckedSupplier.java | 32 ++
2 files changed, 132 insertions(+), 274 deletions(-)
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index e6f55f39286..c0dc516ba11 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -63,6 +63,7 @@ import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
+import org.apache.iotdb.session.util.CheckedSupplier;
import org.apache.iotdb.session.util.SessionUtils;
import org.apache.thrift.TException;
@@ -283,23 +284,12 @@ public class SessionConnection {
protected void setTimeZone(String zoneId)
throws StatementExecutionException, IoTDBConnectionException {
- TSSetTimeZoneReq req = new TSSetTimeZoneReq(sessionId, zoneId);
- TSStatus resp;
- try {
- resp = client.setTimeZone(req);
- } catch (TException e) {
- if (reconnect()) {
- try {
- req.setSessionId(sessionId);
- resp = client.setTimeZone(req);
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
- RpcUtils.verifySuccess(resp);
+ doOperation(
+ () -> {
+ TSSetTimeZoneReq req = new TSSetTimeZoneReq(sessionId, zoneId);
+ RpcUtils.verifySuccess(client.setTimeZone(req));
+ return null;
+ });
setTimeZoneOfSession(zoneId);
}
@@ -316,93 +306,50 @@ public class SessionConnection {
protected void setStorageGroup(String storageGroup)
throws IoTDBConnectionException, StatementExecutionException {
- try {
- RpcUtils.verifySuccess(client.setStorageGroup(sessionId, storageGroup));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
RpcUtils.verifySuccess(client.setStorageGroup(sessionId,
storageGroup));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
protected void deleteStorageGroups(List<String> storageGroups)
throws IoTDBConnectionException, StatementExecutionException {
- try {
- RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId,
storageGroups));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId,
storageGroups));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
protected void createTimeseries(TSCreateTimeseriesReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.createTimeseries(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.createTimeseries(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
protected void createAlignedTimeseries(TSCreateAlignedTimeseriesReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.createAlignedTimeseries(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.createAlignedTimeseries(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
protected void createMultiTimeseries(TSCreateMultiTimeseriesReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.createMultiTimeseries(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.createMultiTimeseries(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
protected boolean checkTimeseriesExists(String path, long timeout)
@@ -1315,116 +1262,62 @@ public class SessionConnection {
protected void testInsertRecord(TSInsertStringRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.testInsertStringRecord(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.testInsertStringRecord(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
protected void testInsertRecord(TSInsertRecordReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.testInsertRecord(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.testInsertRecord(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
public void testInsertRecords(TSInsertStringRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.testInsertStringRecords(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.testInsertStringRecords(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
public void testInsertRecords(TSInsertRecordsReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.testInsertRecords(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.testInsertRecords(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
protected void testInsertTablet(TSInsertTabletReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.testInsertTablet(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.testInsertTablet(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
protected void testInsertTablets(TSInsertTabletsReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.testInsertTablets(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.testInsertTablets(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
@SuppressWarnings({
@@ -1477,172 +1370,105 @@ public class SessionConnection {
protected void createSchemaTemplate(TSCreateSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.createSchemaTemplate(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.createSchemaTemplate(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
protected void appendSchemaTemplate(TSAppendSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.appendSchemaTemplate(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.appendSchemaTemplate(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
protected void pruneSchemaTemplate(TSPruneSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.pruneSchemaTemplate(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.pruneSchemaTemplate(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
protected TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req)
throws StatementExecutionException, IoTDBConnectionException {
- TSQueryTemplateResp execResp;
- req.setSessionId(sessionId);
- try {
- execResp = client.querySchemaTemplate(req);
- RpcUtils.verifySuccess(execResp.getStatus());
- } catch (TException e) {
- if (reconnect()) {
- try {
- execResp = client.querySchemaTemplate(req);
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
-
- RpcUtils.verifySuccess(execResp.getStatus());
- return execResp;
+ return doOperation(
+ () -> {
+ req.setSessionId(sessionId);
+ TSQueryTemplateResp execResp = client.querySchemaTemplate(req);
+ RpcUtils.verifySuccess(execResp.getStatus());
+ return execResp;
+ });
}
protected void setSchemaTemplate(TSSetSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.setSchemaTemplate(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.setSchemaTemplate(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
protected void unsetSchemaTemplate(TSUnsetSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.unsetSchemaTemplate(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.unsetSchemaTemplate(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
protected void dropSchemaTemplate(TSDropSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
- RpcUtils.verifySuccess(client.dropSchemaTemplate(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.dropSchemaTemplate(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(logForReconnectionFailure());
- }
- }
+ return null;
+ });
}
protected void createTimeseriesUsingSchemaTemplate(
TCreateTimeseriesUsingSchemaTemplateReq request)
throws IoTDBConnectionException, StatementExecutionException {
- request.setSessionId(sessionId);
- try {
-
RpcUtils.verifySuccess(client.createTimeseriesUsingSchemaTemplate(request));
- } catch (TException e) {
- if (reconnect()) {
- try {
+ doOperation(
+ () -> {
request.setSessionId(sessionId);
RpcUtils.verifySuccess(client.createTimeseriesUsingSchemaTemplate(request));
- } catch (TException tException) {
- throw new IoTDBConnectionException(tException);
- }
- } else {
- throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL);
- }
- }
+ return null;
+ });
}
protected TSBackupConfigurationResp getBackupConfiguration()
throws IoTDBConnectionException, StatementExecutionException {
- TSBackupConfigurationResp execResp;
+ return doOperation(
+ () -> {
+ TSBackupConfigurationResp execResp = client.getBackupConfiguration();
+ RpcUtils.verifySuccess(execResp.getStatus());
+ return execResp;
+ });
+ }
+
+ private <RETURN> RETURN doOperation(CheckedSupplier<RETURN, TException>
supplier)
+ throws IoTDBConnectionException, StatementExecutionException {
+ RETURN ret;
try {
- execResp = client.getBackupConfiguration();
- RpcUtils.verifySuccess(execResp.getStatus());
+ ret = supplier.get();
} catch (TException e) {
if (reconnect()) {
try {
- execResp = client.getBackupConfiguration();
- RpcUtils.verifySuccess(execResp.getStatus());
+ ret = supplier.get();
} catch (TException tException) {
throw new IoTDBConnectionException(tException);
}
@@ -1650,7 +1476,7 @@ public class SessionConnection {
throw new IoTDBConnectionException(logForReconnectionFailure());
}
}
- return execResp;
+ return ret;
}
public TSConnectionInfoResp fetchAllConnections() throws
IoTDBConnectionException {
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/CheckedSupplier.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/CheckedSupplier.java
new file mode 100644
index 00000000000..93949524afb
--- /dev/null
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/util/CheckedSupplier.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.session.util;
+
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+/** Supplier with a throws-clause. */
+@FunctionalInterface
+public interface CheckedSupplier<OUTPUT, THROWABLE extends Throwable> {
+ /**
+ * The same as {@link java.util.function.Supplier#get()} except that this
method is declared with
+ * a throws-clause.
+ */
+ OUTPUT get() throws THROWABLE, StatementExecutionException;
+}