This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch TableModelIngestion
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/TableModelIngestion by this
push:
new 077dfe7d988 add interface in session
077dfe7d988 is described below
commit 077dfe7d988a855e6d1ec5ad62626bdb898d4413
Author: jt2594838 <[email protected]>
AuthorDate: Fri Jun 21 15:52:38 2024 +0800
add interface in session
---
.../java/org/apache/iotdb/isession/ISession.java | 6 ++
.../java/org/apache/iotdb/session/Session.java | 105 +++++++++++----------
.../apache/iotdb/session/SessionConnection.java | 16 +++-
3 files changed, 74 insertions(+), 53 deletions(-)
diff --git
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
index 85a8c9c00d5..7b07015f374 100644
---
a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
+++
b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/ISession.java
@@ -336,6 +336,12 @@ public interface ISession extends AutoCloseable {
void insertTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException;
+ void insertRelationalTablet(Tablet tablet, boolean sorted)
+ throws IoTDBConnectionException, StatementExecutionException;
+
+ void insertRelationalTablet(Tablet tablet)
+ throws IoTDBConnectionException, StatementExecutionException;
+
void insertAlignedTablet(Tablet tablet)
throws StatementExecutionException, IoTDBConnectionException;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 533e0c9df47..a7303eb6d91 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -70,6 +70,7 @@ import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.record.Tablet.ColumnType;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.slf4j.Logger;
@@ -618,7 +619,9 @@ public class Session implements ISession {
this.zoneId = ZoneId.of(zoneId);
}
- /** Only changes the member variable of the Session object without sending
it to server. */
+ /**
+ * Only changes the member variable of the Session object without sending it
to server.
+ */
@Override
public void setTimeZoneOfSession(String zoneId) {
defaultSessionConnection.setTimeZoneOfSession(zoneId);
@@ -989,7 +992,7 @@ public class Session implements ISession {
*
* @param paths timeSeries eg. root.ln.d1.s1,root.ln.d1.s2
* @param lastTime get the last data, whose timestamp is greater than or
equal lastTime e.g.
- * 1621326244168
+ * 1621326244168
*/
@Override
public SessionDataSet executeLastDataQuery(List<String> paths, long
lastTime, long timeOut)
@@ -1013,7 +1016,8 @@ public class Session implements ISession {
}
/**
- * query eg. select last status from root.ln.wf01.wt01; <PrefixPath> +
<suffixPath> = <TimeSeries>
+ * query eg. select last status from root.ln.wf01.wt01; <PrefixPath> +
<suffixPath> =
+ * <TimeSeries>
*
* @param paths timeSeries. eg.root.ln.d1.s1,root.ln.d1.s2
*/
@@ -1047,7 +1051,7 @@ public class Session implements ISession {
// reconnect with default connection
return defaultSessionConnection.executeLastDataQueryForOneDevice(
- db, device, sensors, isLegalPathNodes, queryTimeoutInMs)
+ db, device, sensors, isLegalPathNodes, queryTimeoutInMs)
.left;
} else {
throw e;
@@ -1258,7 +1262,7 @@ public class Session implements ISession {
TEndPoint endPoint = null;
if (endPointToSessionConnection != null) {
for (Iterator<Entry<TEndPoint, SessionConnection>> it =
- endPointToSessionConnection.entrySet().iterator();
+ endPointToSessionConnection.entrySet().iterator();
it.hasNext(); ) {
Entry<TEndPoint, SessionConnection> entry = it.next();
if (entry.getValue().equals(sessionConnection)) {
@@ -1541,11 +1545,6 @@ public class Session implements ISession {
/**
* When the value is null,filter this,don't use this measurement.
- *
- * @param times
- * @param measurementsList
- * @param valuesList
- * @param typesList
*/
private void filterNullValueAndMeasurement(
List<String> deviceIds,
@@ -1574,12 +1573,6 @@ public class Session implements ISession {
/**
* Filter the null value of list。
- *
- * @param deviceId
- * @param times
- * @param measurementsList
- * @param typesList
- * @param valuesList
*/
private void filterNullValueAndMeasurementOfOneDevice(
String deviceId,
@@ -1607,11 +1600,6 @@ public class Session implements ISession {
/**
* Filter the null value of list。
- *
- * @param times
- * @param deviceId
- * @param measurementsList
- * @param valuesList
*/
private void filterNullValueAndMeasurementWithStringTypeOfOneDevice(
List<Long> times,
@@ -1637,10 +1625,6 @@ public class Session implements ISession {
/**
* Filter the null object of list。
*
- * @param deviceId
- * @param measurementsList
- * @param types
- * @param valuesList
* @return true:all value is null;false:not all null value is null.
*/
private boolean filterNullValueAndMeasurement(
@@ -1670,9 +1654,6 @@ public class Session implements ISession {
* Filter the null object of list。
*
* @param prefixPaths devices path。
- * @param times
- * @param measurementsList
- * @param valuesList
* @return true:all values of valuesList are null;false:Not all values of
valuesList are null.
*/
private void filterNullValueAndMeasurementWithStringType(
@@ -1700,8 +1681,6 @@ public class Session implements ISession {
/**
* When the value is null,filter this,don't use this measurement.
*
- * @param valuesList
- * @param measurementsList
* @return true:all value is null;false:not all null value is null.
*/
private boolean filterNullValueAndMeasurementWithStringType(
@@ -2599,6 +2578,11 @@ public class Session implements ISession {
public void insertTablet(Tablet tablet, boolean sorted)
throws IoTDBConnectionException, StatementExecutionException {
TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
+ insertTabletInternal(tablet, request);
+ }
+
+ private void insertTabletInternal(Tablet tablet, TSInsertTabletReq request)
+ throws IoTDBConnectionException, StatementExecutionException {
try {
getSessionConnection(tablet.getDeviceId()).insertTablet(request);
} catch (RedirectException e) {
@@ -2621,6 +2605,34 @@ public class Session implements ISession {
}
}
+ /**
+ * insert a relational Tablet
+ *
+ * @param tablet data batch
+ * @param sorted deprecated, whether times in Tablet are in ascending order
+ */
+ @Override
+ public void insertRelationalTablet(Tablet tablet, boolean sorted)
+ throws IoTDBConnectionException, StatementExecutionException {
+ TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted, false);
+ request.setWriteToTable(true);
+ request.setColumnCategories(
+ tablet.getColumnTypes().stream().map(t -> (byte) t.ordinal()).collect(
+ Collectors.toList()));
+ insertTabletInternal(tablet, request);
+ }
+
+ /**
+ * insert a relational Tablet
+ *
+ * @param tablet data batch
+ */
+ @Override
+ public void insertRelationalTablet(Tablet tablet)
+ throws IoTDBConnectionException, StatementExecutionException {
+ insertRelationalTablet(tablet, false);
+ }
+
/**
* insert the aligned timeseries data of a device. For each timestamp, the
number of measurements
* is the same.
@@ -3228,7 +3240,7 @@ public class Session implements ISession {
}
@SuppressWarnings({
- "squid:S3776"
+ "squid:S3776"
}) // ignore Cognitive Complexity of methods should not be too high
public void sortTablet(Tablet tablet) {
/*
@@ -3422,16 +3434,14 @@ public class Session implements ISession {
*
* @param name name of the template
* @param schemaNames it works as a virtual layer inside template in 0.12,
and makes no difference
- * after 0.13
+ * after 0.13
* @param measurements the first measurement in each nested list will
constitute the final flat
- * template
+ * template
* @param dataTypes the data type of each measurement, only the first one in
each nested list
- * matters as above
+ * matters as above
* @param encodings the encoding of each measurement, only the first one in
each nested list
- * matters as above
+ * matters as above
* @param compressors the compressor of each measurement
- * @throws IoTDBConnectionException
- * @throws StatementExecutionException
* @deprecated
*/
@Override
@@ -3463,7 +3473,7 @@ public class Session implements ISession {
/**
* @param templateName Template to add aligned measurements.
* @param measurementsPath If measurements get different prefix, or the
prefix already exists in
- * template but not aligned, throw exception.
+ * template but not aligned, throw exception.
* @param dataTypes Data type of these measurements.
* @param encodings Encoding of these measurements.
* @param compressors CompressionType of these measurements.
@@ -3490,7 +3500,7 @@ public class Session implements ISession {
/**
* @param templateName Template to add a single aligned measurement.
* @param measurementPath If prefix of the path exists in template and not
aligned, throw
- * exception.
+ * exception.
*/
@Override
public void addAlignedMeasurementInTemplate(
@@ -3731,18 +3741,14 @@ public class Session implements ISession {
/**
* @param recordsGroup connection to record map
* @param insertConsumer insert function
- * @param <T>
- * <ul>
- * <li>{@link TSInsertRecordsReq}
- * <li>{@link TSInsertStringRecordsReq}
- * <li>{@link TSInsertTabletsReq}
- * </ul>
- *
- * @throws IoTDBConnectionException
- * @throws StatementExecutionException
+ * @param <T> <ul>
+ * <li>{@link TSInsertRecordsReq}
+ * <li>{@link TSInsertStringRecordsReq}
+ * <li>{@link TSInsertTabletsReq}
+ * </ul>
*/
@SuppressWarnings({
- "squid:S3776"
+ "squid:S3776"
}) // ignore Cognitive Complexity of methods should not be too high
private <T> void insertByGroup(
Map<SessionConnection, T> recordsGroup, InsertConsumer<T> insertConsumer)
@@ -3827,6 +3833,7 @@ public class Session implements ISession {
}
public static class Builder {
+
private String host = SessionConfig.DEFAULT_HOST;
private int rpcPort = SessionConfig.DEFAULT_PORT;
private String username = SessionConfig.DEFAULT_USER;
diff --git
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
index 10b4295db20..6b0d9ed955a 100644
---
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
+++
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/SessionConnection.java
@@ -1061,9 +1061,8 @@ public class SessionConnection {
return client.insertStringRecordsOfOneDevice(request);
}
- protected void insertTablet(TSInsertTabletReq request)
- throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
-
+ protected void withRetry(TFunction<TSStatus> function)
+ throws StatementExecutionException, RedirectException,
IoTDBConnectionException {
TException lastTException = null;
TSStatus status = null;
for (int i = 0; i <= maxRetryCount; i++) {
@@ -1083,7 +1082,7 @@ public class SessionConnection {
}
}
try {
- status = insertTabletInternal(request);
+ status = function.run();
// need retry
if (status.isSetNeedRetry() && status.isNeedRetry()) {
continue;
@@ -1112,6 +1111,11 @@ public class SessionConnection {
}
}
+ protected void insertTablet(TSInsertTabletReq request)
+ throws IoTDBConnectionException, StatementExecutionException,
RedirectException {
+ withRetry(() -> insertTabletInternal(request));
+ }
+
private TSStatus insertTabletInternal(TSInsertTabletReq request) throws
TException {
request.setSessionId(sessionId);
return client.insertTablet(request);
@@ -1659,4 +1663,8 @@ public class SessionConnection {
public String toString() {
return "SessionConnection{" + " endPoint=" + endPoint + "}";
}
+
+ private interface TFunction<T> {
+ T run() throws TException;
+ }
}