This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch HighAvailability in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 44bb9b871c4a11a3bee1d59e1fd34d72e7aef2bb Author: JackieTien97 <[email protected]> AuthorDate: Mon Oct 23 16:58:22 2023 +0800 retry after one DataNode is down --- .../main/java/org/apache/iotdb/SessionExample.java | 104 ++++++++++----------- .../apache/iotdb/commons/client/ThriftClient.java | 7 +- 2 files changed, 57 insertions(+), 54 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 60bf282ed8f..cc040fb93ea 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -26,7 +26,6 @@ import org.apache.iotdb.isession.template.Template; import org.apache.iotdb.isession.util.Version; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.session.Session; import org.apache.iotdb.session.template.MeasurementNode; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; @@ -78,48 +77,48 @@ public class SessionExample { // set session fetchSize session.setFetchSize(10000); - try { - session.createDatabase("root.sg1"); - } catch (StatementExecutionException e) { - if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { - throw e; - } - } + // try { + // session.createDatabase("root.sg1"); + // } catch (StatementExecutionException e) { + // if (e.getStatusCode() != TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) { + // throw e; + // } + // } // createTemplate(); - createTimeseries(); - createMultiTimeseries(); - insertRecord(); - insertTablet(); + // createTimeseries(); + // createMultiTimeseries(); + // insertRecord(); + // insertTablet(); // insertTabletWithNullValues(); // insertTablets(); - // insertRecords(); + insertRecords(); // insertText(); // selectInto(); // createAndDropContinuousQueries(); // nonQuery(); - query(); + // query(); // queryWithTimeout(); - rawDataQuery(); - lastDataQuery(); - aggregationQuery(); - groupByQuery(); + // rawDataQuery(); + // lastDataQuery(); + // aggregationQuery(); + // groupByQuery(); // queryByIterator(); // deleteData(); // deleteTimeseries(); // setTimeout(); - sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); - sessionEnableRedirect.setEnableQueryRedirection(true); - sessionEnableRedirect.open(false); + // sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root"); + // sessionEnableRedirect.setEnableQueryRedirection(true); + // sessionEnableRedirect.open(false); // set session fetchSize - sessionEnableRedirect.setFetchSize(10000); + // sessionEnableRedirect.setFetchSize(10000); - fastLastDataQueryForOneDevice(); - insertRecord4Redirect(); - query4Redirect(); - sessionEnableRedirect.close(); + // fastLastDataQueryForOneDevice(); + // insertRecord4Redirect(); + // query4Redirect(); + // sessionEnableRedirect.close(); session.close(); } @@ -336,41 +335,42 @@ public class SessionExample { } private static void insertRecords() throws IoTDBConnectionException, StatementExecutionException { - String deviceId = ROOT_SG1_D1; List<String> measurements = new ArrayList<>(); measurements.add("s1"); measurements.add("s2"); measurements.add("s3"); List<String> deviceIds = new ArrayList<>(); + deviceIds.add("root.db.d1"); + deviceIds.add("root.db.d2"); + deviceIds.add("root.db.d3"); + List<TSDataType> types = new ArrayList<>(); + types.add(TSDataType.FLOAT); + types.add(TSDataType.FLOAT); + types.add(TSDataType.FLOAT); + List<Object> values = new ArrayList<>(); + values.add(2.0f); + values.add(2.0f); + values.add(2.0f); + List<List<String>> measurementsList = new ArrayList<>(); + measurementsList.add(measurements); + measurementsList.add(measurements); + measurementsList.add(measurements); + List<List<Object>> valuesList = new ArrayList<>(); + valuesList.add(values); + valuesList.add(values); + valuesList.add(values); + List<Long> timestamps = new ArrayList<>(); - List<List<TSDataType>> typesList = new ArrayList<>(); + timestamps.add(2L); + timestamps.add(2L); + timestamps.add(2L); - for (long time = 0; time < 500; time++) { - List<Object> values = new ArrayList<>(); - List<TSDataType> types = new ArrayList<>(); - values.add(1L); - values.add(2L); - values.add(3L); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - types.add(TSDataType.INT64); - - deviceIds.add(deviceId); - measurementsList.add(measurements); - valuesList.add(values); - typesList.add(types); - timestamps.add(time); - if (time != 0 && time % 100 == 0) { - session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); - deviceIds.clear(); - measurementsList.clear(); - valuesList.clear(); - typesList.clear(); - timestamps.clear(); - } - } + List<List<TSDataType>> typesList = new ArrayList<>(); + typesList.add(types); + typesList.add(types); + typesList.add(types); session.insertRecords(deviceIds, timestamps, measurementsList, typesList, valuesList); } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java index f57015cfab3..a93fe75f2b9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java @@ -25,6 +25,7 @@ import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.lang.reflect.InvocationTargetException; import java.net.SocketException; @@ -102,9 +103,11 @@ public interface ThriftClient { * @param cause Throwable * @return true/false */ - public static boolean isConnectionBroken(Throwable cause) { + static boolean isConnectionBroken(Throwable cause) { return (cause instanceof SocketException && cause.getMessage().contains("Broken pipe")) || (cause instanceof TTransportException - && cause.getMessage().contains("Socket is closed by peer")); + && cause.getMessage().contains("Socket is closed by peer")) + || (cause instanceof IOException + && cause.getMessage().contains("Connection reset by peer")); } }
