This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch HighAvailability
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 44bb9b871c4a11a3bee1d59e1fd34d72e7aef2bb
Author: JackieTien97 <[email protected]>
AuthorDate: Mon Oct 23 16:58:22 2023 +0800

    retry after one DataNode is down
---
 .../main/java/org/apache/iotdb/SessionExample.java | 104 ++++++++++-----------
 .../apache/iotdb/commons/client/ThriftClient.java  |   7 +-
 2 files changed, 57 insertions(+), 54 deletions(-)

diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java 
b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
index 60bf282ed8f..cc040fb93ea 100644
--- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java
+++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java
@@ -26,7 +26,6 @@ import org.apache.iotdb.isession.template.Template;
 import org.apache.iotdb.isession.util.Version;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
-import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.session.Session;
 import org.apache.iotdb.session.template.MeasurementNode;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -78,48 +77,48 @@ public class SessionExample {
     // set session fetchSize
     session.setFetchSize(10000);
 
-    try {
-      session.createDatabase("root.sg1");
-    } catch (StatementExecutionException e) {
-      if (e.getStatusCode() != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
-        throw e;
-      }
-    }
+    //    try {
+    //      session.createDatabase("root.sg1");
+    //    } catch (StatementExecutionException e) {
+    //      if (e.getStatusCode() != 
TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode()) {
+    //        throw e;
+    //      }
+    //    }
 
     //     createTemplate();
-    createTimeseries();
-    createMultiTimeseries();
-    insertRecord();
-    insertTablet();
+    //    createTimeseries();
+    //    createMultiTimeseries();
+    //    insertRecord();
+    //    insertTablet();
     //    insertTabletWithNullValues();
     //    insertTablets();
-    //    insertRecords();
+    insertRecords();
     //    insertText();
     //    selectInto();
     //    createAndDropContinuousQueries();
     //    nonQuery();
-    query();
+    //    query();
     //    queryWithTimeout();
-    rawDataQuery();
-    lastDataQuery();
-    aggregationQuery();
-    groupByQuery();
+    //    rawDataQuery();
+    //    lastDataQuery();
+    //    aggregationQuery();
+    //    groupByQuery();
     //    queryByIterator();
     //    deleteData();
     //    deleteTimeseries();
     //    setTimeout();
 
-    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", "root");
-    sessionEnableRedirect.setEnableQueryRedirection(true);
-    sessionEnableRedirect.open(false);
+    //    sessionEnableRedirect = new Session(LOCAL_HOST, 6667, "root", 
"root");
+    //    sessionEnableRedirect.setEnableQueryRedirection(true);
+    //    sessionEnableRedirect.open(false);
 
     // set session fetchSize
-    sessionEnableRedirect.setFetchSize(10000);
+    //    sessionEnableRedirect.setFetchSize(10000);
 
-    fastLastDataQueryForOneDevice();
-    insertRecord4Redirect();
-    query4Redirect();
-    sessionEnableRedirect.close();
+    //    fastLastDataQueryForOneDevice();
+    //    insertRecord4Redirect();
+    //    query4Redirect();
+    //    sessionEnableRedirect.close();
     session.close();
   }
 
@@ -336,41 +335,42 @@ public class SessionExample {
   }
 
   private static void insertRecords() throws IoTDBConnectionException, 
StatementExecutionException {
-    String deviceId = ROOT_SG1_D1;
     List<String> measurements = new ArrayList<>();
     measurements.add("s1");
     measurements.add("s2");
     measurements.add("s3");
     List<String> deviceIds = new ArrayList<>();
+    deviceIds.add("root.db.d1");
+    deviceIds.add("root.db.d2");
+    deviceIds.add("root.db.d3");
+    List<TSDataType> types = new ArrayList<>();
+    types.add(TSDataType.FLOAT);
+    types.add(TSDataType.FLOAT);
+    types.add(TSDataType.FLOAT);
+    List<Object> values = new ArrayList<>();
+    values.add(2.0f);
+    values.add(2.0f);
+    values.add(2.0f);
+
     List<List<String>> measurementsList = new ArrayList<>();
+    measurementsList.add(measurements);
+    measurementsList.add(measurements);
+    measurementsList.add(measurements);
+
     List<List<Object>> valuesList = new ArrayList<>();
+    valuesList.add(values);
+    valuesList.add(values);
+    valuesList.add(values);
+
     List<Long> timestamps = new ArrayList<>();
-    List<List<TSDataType>> typesList = new ArrayList<>();
+    timestamps.add(2L);
+    timestamps.add(2L);
+    timestamps.add(2L);
 
-    for (long time = 0; time < 500; time++) {
-      List<Object> values = new ArrayList<>();
-      List<TSDataType> types = new ArrayList<>();
-      values.add(1L);
-      values.add(2L);
-      values.add(3L);
-      types.add(TSDataType.INT64);
-      types.add(TSDataType.INT64);
-      types.add(TSDataType.INT64);
-
-      deviceIds.add(deviceId);
-      measurementsList.add(measurements);
-      valuesList.add(values);
-      typesList.add(types);
-      timestamps.add(time);
-      if (time != 0 && time % 100 == 0) {
-        session.insertRecords(deviceIds, timestamps, measurementsList, 
typesList, valuesList);
-        deviceIds.clear();
-        measurementsList.clear();
-        valuesList.clear();
-        typesList.clear();
-        timestamps.clear();
-      }
-    }
+    List<List<TSDataType>> typesList = new ArrayList<>();
+    typesList.add(types);
+    typesList.add(types);
+    typesList.add(types);
 
     session.insertRecords(deviceIds, timestamps, measurementsList, typesList, 
valuesList);
   }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
index f57015cfab3..a93fe75f2b9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ThriftClient.java
@@ -25,6 +25,7 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.net.SocketException;
 
@@ -102,9 +103,11 @@ public interface ThriftClient {
    * @param cause Throwable
    * @return true/false
    */
-  public static boolean isConnectionBroken(Throwable cause) {
+  static boolean isConnectionBroken(Throwable cause) {
     return (cause instanceof SocketException && 
cause.getMessage().contains("Broken pipe"))
         || (cause instanceof TTransportException
-            && cause.getMessage().contains("Socket is closed by peer"));
+            && cause.getMessage().contains("Socket is closed by peer"))
+        || (cause instanceof IOException
+            && cause.getMessage().contains("Connection reset by peer"));
   }
 }

Reply via email to