This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/TableModelIngestion by this 
push:
     new 077dfe7d988 add  interface in session
077dfe7d988 is described below

commit 077dfe7d988a855e6d1ec5ad62626bdb898d4413
Author: jt2594838 <[email protected]>
AuthorDate: Fri Jun 21 15:52:38 2024 +0800

    add  interface in session
---
 .../java/org/apache/iotdb/isession/ISession.java   |   6 ++
 .../java/org/apache/iotdb/session/Session.java     | 105 +++++++++++----------
 .../apache/iotdb/session/SessionConnection.java    |  16 +++-
 3 files changed, 74 insertions(+), 53 deletions(-)

diff --git 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 85a8c9c00d5..7b07015f374 100644
--- 
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++ 
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -336,6 +336,12 @@ public interface ISession extends AutoCloseable {
   void insertTablet(Tablet tablet, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException;
 
+  void insertRelationalTablet(Tablet tablet, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException;
+
+  void insertRelationalTablet(Tablet tablet)
+      throws IoTDBConnectionException, StatementExecutionException;
+
   void insertAlignedTablet(Tablet tablet)
       throws StatementExecutionException, IoTDBConnectionException;
 
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 533e0c9df47..a7303eb6d91 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -70,6 +70,7 @@ import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.UnSupportedDataTypeException;
 import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.slf4j.Logger;
@@ -618,7 +619,9 @@ public class Session implements ISession {
     this.zoneId = ZoneId.of(zoneId);
   }
 
-  /** Only changes the member variable of the Session object without sending 
it to server. */
+  /**
+   * Only changes the member variable of the Session object without sending it 
to server.
+   */
   @Override
   public void setTimeZoneOfSession(String zoneId) {
     defaultSessionConnection.setTimeZoneOfSession(zoneId);
@@ -989,7 +992,7 @@ public class Session implements ISession {
    *
    * @param paths timeSeries eg. root.ln.d1.s1,root.ln.d1.s2
    * @param lastTime get the last data, whose timestamp is greater than or 
equal lastTime e.g.
-   *     1621326244168
+   * 1621326244168
    */
   @Override
   public SessionDataSet executeLastDataQuery(List<String> paths, long 
lastTime, long timeOut)
@@ -1013,7 +1016,8 @@ public class Session implements ISession {
   }
 
   /**
-   * query eg. select last status from root.ln.wf01.wt01; <PrefixPath> + 
<suffixPath> = <TimeSeries>
+   * query eg. select last status from root.ln.wf01.wt01; <PrefixPath> + 
<suffixPath> =
+   * <TimeSeries>
    *
    * @param paths timeSeries. eg.root.ln.d1.s1,root.ln.d1.s2
    */
@@ -1047,7 +1051,7 @@ public class Session implements ISession {
 
         // reconnect with default connection
         return defaultSessionConnection.executeLastDataQueryForOneDevice(
-                db, device, sensors, isLegalPathNodes, queryTimeoutInMs)
+            db, device, sensors, isLegalPathNodes, queryTimeoutInMs)
             .left;
       } else {
         throw e;
@@ -1258,7 +1262,7 @@ public class Session implements ISession {
       TEndPoint endPoint = null;
       if (endPointToSessionConnection != null) {
         for (Iterator<Entry<TEndPoint, SessionConnection>> it =
-                endPointToSessionConnection.entrySet().iterator();
+            endPointToSessionConnection.entrySet().iterator();
             it.hasNext(); ) {
           Entry<TEndPoint, SessionConnection> entry = it.next();
           if (entry.getValue().equals(sessionConnection)) {
@@ -1541,11 +1545,6 @@ public class Session implements ISession {
 
   /**
    * When the value is null,filter this,don't use this measurement.
-   *
-   * @param times
-   * @param measurementsList
-   * @param valuesList
-   * @param typesList
    */
   private void filterNullValueAndMeasurement(
       List<String> deviceIds,
@@ -1574,12 +1573,6 @@ public class Session implements ISession {
 
   /**
    * Filter the null value of list。
-   *
-   * @param deviceId
-   * @param times
-   * @param measurementsList
-   * @param typesList
-   * @param valuesList
    */
   private void filterNullValueAndMeasurementOfOneDevice(
       String deviceId,
@@ -1607,11 +1600,6 @@ public class Session implements ISession {
 
   /**
    * Filter the null value of list。
-   *
-   * @param times
-   * @param deviceId
-   * @param measurementsList
-   * @param valuesList
    */
   private void filterNullValueAndMeasurementWithStringTypeOfOneDevice(
       List<Long> times,
@@ -1637,10 +1625,6 @@ public class Session implements ISession {
   /**
    * Filter the null object of list。
    *
-   * @param deviceId
-   * @param measurementsList
-   * @param types
-   * @param valuesList
    * @return true:all value is null;false:not all null value is null.
    */
   private boolean filterNullValueAndMeasurement(
@@ -1670,9 +1654,6 @@ public class Session implements ISession {
    * Filter the null object of list。
    *
    * @param prefixPaths devices path。
-   * @param times
-   * @param measurementsList
-   * @param valuesList
    * @return true:all values of valuesList are null;false:Not all values of 
valuesList are null.
    */
   private void filterNullValueAndMeasurementWithStringType(
@@ -1700,8 +1681,6 @@ public class Session implements ISession {
   /**
    * When the value is null,filter this,don't use this measurement.
    *
-   * @param valuesList
-   * @param measurementsList
    * @return true:all value is null;false:not all null value is null.
    */
   private boolean filterNullValueAndMeasurementWithStringType(
@@ -2599,6 +2578,11 @@ public class Session implements ISession {
   public void insertTablet(Tablet tablet, boolean sorted)
       throws IoTDBConnectionException, StatementExecutionException {
     TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
+    insertTabletInternal(tablet, request);
+  }
+
+  private void insertTabletInternal(Tablet tablet, TSInsertTabletReq request)
+      throws IoTDBConnectionException, StatementExecutionException {
     try {
       getSessionConnection(tablet.getDeviceId()).insertTablet(request);
     } catch (RedirectException e) {
@@ -2621,6 +2605,34 @@ public class Session implements ISession {
     }
   }
 
+  /**
+   * insert a relational Tablet
+   *
+   * @param tablet data batch
+   * @param sorted deprecated, whether times in Tablet are in ascending order
+   */
+  @Override
+  public void insertRelationalTablet(Tablet tablet, boolean sorted)
+      throws IoTDBConnectionException, StatementExecutionException {
+    TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
+    request.setWriteToTable(true);
+    request.setColumnCategories(
+        tablet.getColumnTypes().stream().map(t -> (byte) t.ordinal()).collect(
+            Collectors.toList()));
+    insertTabletInternal(tablet, request);
+  }
+
+  /**
+   * insert a relational Tablet
+   *
+   * @param tablet data batch
+   */
+  @Override
+  public void insertRelationalTablet(Tablet tablet)
+      throws IoTDBConnectionException, StatementExecutionException {
+    insertRelationalTablet(tablet, false);
+  }
+
   /**
    * insert the aligned timeseries data of a device. For each timestamp, the 
number of measurements
    * is the same.
@@ -3228,7 +3240,7 @@ public class Session implements ISession {
   }
 
   @SuppressWarnings({
-    "squid:S3776"
+      "squid:S3776"
   }) // ignore Cognitive Complexity of methods should not be too high
   public void sortTablet(Tablet tablet) {
     /*
@@ -3422,16 +3434,14 @@ public class Session implements ISession {
    *
    * @param name name of the template
    * @param schemaNames it works as a virtual layer inside template in 0.12, 
and makes no difference
-   *     after 0.13
+   * after 0.13
    * @param measurements the first measurement in each nested list will 
constitute the final flat
-   *     template
+   * template
    * @param dataTypes the data type of each measurement, only the first one in 
each nested list
-   *     matters as above
+   * matters as above
    * @param encodings the encoding of each measurement, only the first one in 
each nested list
-   *     matters as above
+   * matters as above
    * @param compressors the compressor of each measurement
-   * @throws IoTDBConnectionException
-   * @throws StatementExecutionException
    * @deprecated
    */
   @Override
@@ -3463,7 +3473,7 @@ public class Session implements ISession {
   /**
    * @param templateName Template to add aligned measurements.
    * @param measurementsPath If measurements get different prefix, or the 
prefix already exists in
-   *     template but not aligned, throw exception.
+   * template but not aligned, throw exception.
    * @param dataTypes Data type of these measurements.
    * @param encodings Encoding of these measurements.
    * @param compressors CompressionType of these measurements.
@@ -3490,7 +3500,7 @@ public class Session implements ISession {
   /**
    * @param templateName Template to add a single aligned measurement.
    * @param measurementPath If prefix of the path exists in template and not 
aligned, throw
-   *     exception.
+   * exception.
    */
   @Override
   public void addAlignedMeasurementInTemplate(
@@ -3731,18 +3741,14 @@ public class Session implements ISession {
   /**
    * @param recordsGroup connection to record map
    * @param insertConsumer insert function
-   * @param <T>
-   *     <ul>
-   *       <li>{@link TSInsertRecordsReq}
-   *       <li>{@link TSInsertStringRecordsReq}
-   *       <li>{@link TSInsertTabletsReq}
-   *     </ul>
-   *
-   * @throws IoTDBConnectionException
-   * @throws StatementExecutionException
+   * @param <T> <ul>
+   * <li>{@link TSInsertRecordsReq}
+   * <li>{@link TSInsertStringRecordsReq}
+   * <li>{@link TSInsertTabletsReq}
+   * </ul>
    */
   @SuppressWarnings({
-    "squid:S3776"
+      "squid:S3776"
   }) // ignore Cognitive Complexity of methods should not be too high
   private <T> void insertByGroup(
       Map<SessionConnection, T> recordsGroup, InsertConsumer<T> insertConsumer)
@@ -3827,6 +3833,7 @@ public class Session implements ISession {
   }
 
   public static class Builder {
+
     private String host = SessionConfig.DEFAULT_HOST;
     private int rpcPort = SessionConfig.DEFAULT_PORT;
     private String username = SessionConfig.DEFAULT_USER;
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 10b4295db20..6b0d9ed955a 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -1061,9 +1061,8 @@ public class SessionConnection {
     return client.insertStringRecordsOfOneDevice(request);
   }
 
-  protected void insertTablet(TSInsertTabletReq request)
-      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
-
+  protected void withRetry(TFunction<TSStatus> function)
+      throws StatementExecutionException, RedirectException, 
IoTDBConnectionException {
     TException lastTException = null;
     TSStatus status = null;
     for (int i = 0; i <= maxRetryCount; i++) {
@@ -1083,7 +1082,7 @@ public class SessionConnection {
         }
       }
       try {
-        status = insertTabletInternal(request);
+        status = function.run();
         // need retry
         if (status.isSetNeedRetry() && status.isNeedRetry()) {
           continue;
@@ -1112,6 +1111,11 @@ public class SessionConnection {
     }
   }
 
+  protected void insertTablet(TSInsertTabletReq request)
+      throws IoTDBConnectionException, StatementExecutionException, 
RedirectException {
+    withRetry(() -> insertTabletInternal(request));
+  }
+
   private TSStatus insertTabletInternal(TSInsertTabletReq request) throws 
TException {
     request.setSessionId(sessionId);
     return client.insertTablet(request);
@@ -1659,4 +1663,8 @@ public class SessionConnection {
   public String toString() {
     return "SessionConnection{" + " endPoint=" + endPoint + "}";
   }
+
+  private interface TFunction<T> {
+    T run() throws TException;
+  }
 }

Reply via email to