This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git
The following commit(s) were added to refs/heads/main by this push:
new 6ba97f5 Call VerifySuccess before return to user (#151)
6ba97f5 is described below
commit 6ba97f5048e769f6b382ade8cfe898a792855024
Author: Haonan <[email protected]>
AuthorDate: Thu Feb 12 18:03:50 2026 +0800
Call VerifySuccess before return to user (#151)
* Move VerifySuccess
* fix missing code
* fix missing code
* fix copilot review
---
client/errors.go | 29 ++-
client/session.go | 243 ++++++++++++++-------
client/tablesession.go | 14 +-
client/tablesessionpool.go | 66 ++++--
client/utils.go | 8 +-
example/session_example.go | 16 +-
example/session_pool/session_pool_example.go | 17 +-
.../table/table_session_pool_example.go | 9 +-
example/table/table_session_example.go | 9 +-
test/e2e/e2e_table_test.go | 6 +-
test/e2e/e2e_test.go | 28 +--
11 files changed, 251 insertions(+), 194 deletions(-)
diff --git a/client/errors.go b/client/errors.go
index 1fc3f8b..13f091e 100644
--- a/client/errors.go
+++ b/client/errors.go
@@ -20,29 +20,34 @@
package client
import (
- "bytes"
+ "fmt"
"github.com/apache/iotdb-client-go/v2/common"
)
+// ExecutionError represents an error returned by the server via TSStatus.
+// It is NOT a connection error and should not cause session drops.
+type ExecutionError struct {
+ Code int32
+ Message string
+}
+
+func (e *ExecutionError) Error() string {
+ if e.Message != "" {
+ return fmt.Sprintf("error code: %d, message: %v", e.Code,
e.Message)
+ }
+ return fmt.Sprintf("error code: %d", e.Code)
+}
+
type BatchError struct {
statuses []*common.TSStatus
+ Message string
}
func (e *BatchError) Error() string {
- buff := bytes.Buffer{}
- for _, status := range e.statuses {
- buff.WriteString(*status.Message + ";")
- }
- return buff.String()
+ return e.Message
}
func (e *BatchError) GetStatuses() []*common.TSStatus {
return e.statuses
}
-
-func NewBatchError(statuses []*common.TSStatus) *BatchError {
- return &BatchError{
- statuses: statuses,
- }
-}
diff --git a/client/session.go b/client/session.go
index e62f5e6..2cd1e82 100644
--- a/client/session.go
+++ b/client/session.go
@@ -228,14 +228,17 @@ func (s *Session) Close() error {
*return
*error: correctness of operation
*/
-func (s *Session) SetStorageGroup(storageGroupId string) (r *common.TSStatus,
err error) {
- r, err = s.client.SetStorageGroup(context.Background(), s.sessionId,
storageGroupId)
+func (s *Session) SetStorageGroup(storageGroupId string) error {
+ r, err := s.client.SetStorageGroup(context.Background(), s.sessionId,
storageGroupId)
if err != nil && r == nil {
if s.reconnect() {
r, err = s.client.SetStorageGroup(context.Background(),
s.sessionId, storageGroupId)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
/*
@@ -245,14 +248,17 @@ func (s *Session) SetStorageGroup(storageGroupId string)
(r *common.TSStatus, er
*return
*error: correctness of operation
*/
-func (s *Session) DeleteStorageGroup(storageGroupId string) (r
*common.TSStatus, err error) {
- r, err = s.client.DeleteStorageGroups(context.Background(),
s.sessionId, []string{storageGroupId})
+func (s *Session) DeleteStorageGroup(storageGroupId string) error {
+ r, err := s.client.DeleteStorageGroups(context.Background(),
s.sessionId, []string{storageGroupId})
if err != nil && r == nil {
if s.reconnect() {
r, err =
s.client.DeleteStorageGroups(context.Background(), s.sessionId,
[]string{storageGroupId})
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
/*
@@ -262,14 +268,17 @@ func (s *Session) DeleteStorageGroup(storageGroupId
string) (r *common.TSStatus,
*return
*error: correctness of operation
*/
-func (s *Session) DeleteStorageGroups(storageGroupIds ...string) (r
*common.TSStatus, err error) {
- r, err = s.client.DeleteStorageGroups(context.Background(),
s.sessionId, storageGroupIds)
+func (s *Session) DeleteStorageGroups(storageGroupIds ...string) error {
+ r, err := s.client.DeleteStorageGroups(context.Background(),
s.sessionId, storageGroupIds)
if err != nil && r == nil {
if s.reconnect() {
r, err =
s.client.DeleteStorageGroups(context.Background(), s.sessionId, storageGroupIds)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
/*
@@ -282,7 +291,7 @@ func (s *Session) DeleteStorageGroups(storageGroupIds
...string) (r *common.TSSt
*return
*error: correctness of operation
*/
-func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding
TSEncoding, compressor TSCompressionType, attributes map[string]string, tags
map[string]string) (r *common.TSStatus, err error) {
+func (s *Session) CreateTimeseries(path string, dataType TSDataType, encoding
TSEncoding, compressor TSCompressionType, attributes map[string]string, tags
map[string]string) error {
request := rpc.TSCreateTimeseriesReq{
SessionId: s.sessionId, Path: path, DataType: int32(dataType),
Encoding: int32(encoding),
Compressor: int32(compressor), Attributes: attributes, Tags:
tags,
@@ -294,7 +303,10 @@ func (s *Session) CreateTimeseries(path string, dataType
TSDataType, encoding TS
status, err =
s.client.CreateTimeseries(context.Background(), &request)
}
}
- return status, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(status)
}
/*
@@ -309,7 +321,7 @@ func (s *Session) CreateTimeseries(path string, dataType
TSDataType, encoding TS
*return
*error: correctness of operation
*/
-func (s *Session) CreateAlignedTimeseries(prefixPath string, measurements
[]string, dataTypes []TSDataType, encodings []TSEncoding, compressors
[]TSCompressionType, measurementAlias []string) (r *common.TSStatus, err error)
{
+func (s *Session) CreateAlignedTimeseries(prefixPath string, measurements
[]string, dataTypes []TSDataType, encodings []TSEncoding, compressors
[]TSCompressionType, measurementAlias []string) error {
destTypes := make([]int32, len(dataTypes))
for i, t := range dataTypes {
destTypes[i] = int32(t)
@@ -341,7 +353,10 @@ func (s *Session) CreateAlignedTimeseries(prefixPath
string, measurements []stri
status, err =
s.client.CreateAlignedTimeseries(context.Background(), &request)
}
}
- return status, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(status)
}
/*
@@ -354,7 +369,7 @@ func (s *Session) CreateAlignedTimeseries(prefixPath
string, measurements []stri
*return
*error: correctness of operation
*/
-func (s *Session) CreateMultiTimeseries(paths []string, dataTypes
[]TSDataType, encodings []TSEncoding, compressors []TSCompressionType) (r
*common.TSStatus, err error) {
+func (s *Session) CreateMultiTimeseries(paths []string, dataTypes
[]TSDataType, encodings []TSEncoding, compressors []TSCompressionType) error {
destTypes := make([]int32, len(dataTypes))
for i, t := range dataTypes {
destTypes[i] = int32(t)
@@ -374,7 +389,7 @@ func (s *Session) CreateMultiTimeseries(paths []string,
dataTypes []TSDataType,
SessionId: s.sessionId, Paths: paths, DataTypes: destTypes,
Encodings: destEncodings, Compressors: destCompressions,
}
- r, err = s.client.CreateMultiTimeseries(context.Background(), &request)
+ r, err := s.client.CreateMultiTimeseries(context.Background(), &request)
if err != nil && r == nil {
if s.reconnect() {
@@ -383,7 +398,10 @@ func (s *Session) CreateMultiTimeseries(paths []string,
dataTypes []TSDataType,
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
/*
@@ -393,14 +411,17 @@ func (s *Session) CreateMultiTimeseries(paths []string,
dataTypes []TSDataType,
*return
*error: correctness of operation
*/
-func (s *Session) DeleteTimeseries(paths []string) (r *common.TSStatus, err
error) {
- r, err = s.client.DeleteTimeseries(context.Background(), s.sessionId,
paths)
+func (s *Session) DeleteTimeseries(paths []string) error {
+ r, err := s.client.DeleteTimeseries(context.Background(), s.sessionId,
paths)
if err != nil && r == nil {
if s.reconnect() {
r, err =
s.client.DeleteTimeseries(context.Background(), s.sessionId, paths)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
/*
@@ -412,16 +433,19 @@ func (s *Session) DeleteTimeseries(paths []string) (r
*common.TSStatus, err erro
*return
*error: correctness of operation
*/
-func (s *Session) DeleteData(paths []string, startTime int64, endTime int64)
(r *common.TSStatus, err error) {
+func (s *Session) DeleteData(paths []string, startTime int64, endTime int64)
error {
request := rpc.TSDeleteDataReq{SessionId: s.sessionId, Paths: paths,
StartTime: startTime, EndTime: endTime}
- r, err = s.client.DeleteData(context.Background(), &request)
+ r, err := s.client.DeleteData(context.Background(), &request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.DeleteData(context.Background(),
&request)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
/*
@@ -434,19 +458,22 @@ func (s *Session) DeleteData(paths []string, startTime
int64, endTime int64) (r
*return
*error: correctness of operation
*/
-func (s *Session) InsertStringRecord(deviceId string, measurements []string,
values []string, timestamp int64) (r *common.TSStatus, err error) {
+func (s *Session) InsertStringRecord(deviceId string, measurements []string,
values []string, timestamp int64) error {
request := rpc.TSInsertStringRecordReq{
SessionId: s.sessionId, PrefixPath: deviceId, Measurements:
measurements,
Values: values, Timestamp: timestamp,
}
- r, err = s.client.InsertStringRecord(context.Background(), &request)
+ r, err := s.client.InsertStringRecord(context.Background(), &request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err =
s.client.InsertStringRecord(context.Background(), &request)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
func (s *Session) GetTimeZone() (string, error) {
@@ -457,11 +484,17 @@ func (s *Session) GetTimeZone() (string, error) {
return resp.TimeZone, nil
}
-func (s *Session) SetTimeZone(timeZone string) (r *common.TSStatus, err error)
{
+func (s *Session) SetTimeZone(timeZone string) error {
request := rpc.TSSetTimeZoneReq{SessionId: s.sessionId, TimeZone:
timeZone}
- r, err = s.client.SetTimeZone(context.Background(), &request)
+ r, err := s.client.SetTimeZone(context.Background(), &request)
+ if err != nil {
+ return err
+ }
+ if err := VerifySuccess(r); err != nil {
+ return err
+ }
s.config.TimeZone = timeZone
- return r, err
+ return nil
}
func (s *Session) ExecuteStatementWithContext(ctx context.Context, sql string)
(*SessionDataSet, error) {
@@ -491,7 +524,7 @@ func (s *Session) ExecuteStatement(sql string)
(*SessionDataSet, error) {
return s.ExecuteStatementWithContext(context.Background(), sql)
}
-func (s *Session) ExecuteNonQueryStatement(sql string) (r *common.TSStatus,
err error) {
+func (s *Session) ExecuteNonQueryStatement(sql string) error {
request := rpc.TSExecuteStatementReq{
SessionId: s.sessionId,
Statement: sql,
@@ -507,11 +540,13 @@ func (s *Session) ExecuteNonQueryStatement(sql string) (r
*common.TSStatus, err
resp, err =
s.client.ExecuteStatementV2(context.Background(), &request)
}
}
+ if err != nil {
+ return err
+ }
if resp.IsSetDatabase() {
s.changeDatabase(*resp.Database)
}
-
- return resp.Status, err
+ return VerifySuccess(resp.Status)
}
func (s *Session) changeDatabase(database string) {
@@ -648,12 +683,12 @@ func (s *Session) genTSInsertRecordReq(deviceId string,
time int64,
return request, nil
}
-func (s *Session) InsertRecord(deviceId string, measurements []string,
dataTypes []TSDataType, values []interface{}, timestamp int64) (r
*common.TSStatus, err error) {
+func (s *Session) InsertRecord(deviceId string, measurements []string,
dataTypes []TSDataType, values []interface{}, timestamp int64) error {
request, err := s.genTSInsertRecordReq(deviceId, timestamp,
measurements, dataTypes, values, false)
if err != nil {
- return nil, err
+ return err
}
- r, err = s.client.InsertRecord(context.Background(), request)
+ r, err := s.client.InsertRecord(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
@@ -662,15 +697,18 @@ func (s *Session) InsertRecord(deviceId string,
measurements []string, dataTypes
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
-func (s *Session) InsertAlignedRecord(deviceId string, measurements []string,
dataTypes []TSDataType, values []interface{}, timestamp int64) (r
*common.TSStatus, err error) {
+func (s *Session) InsertAlignedRecord(deviceId string, measurements []string,
dataTypes []TSDataType, values []interface{}, timestamp int64) error {
request, err := s.genTSInsertRecordReq(deviceId, timestamp,
measurements, dataTypes, values, true)
if err != nil {
- return nil, err
+ return err
}
- r, err = s.client.InsertRecord(context.Background(), request)
+ r, err := s.client.InsertRecord(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
@@ -679,7 +717,10 @@ func (s *Session) InsertAlignedRecord(deviceId string,
measurements []string, da
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
type deviceData struct {
@@ -709,10 +750,10 @@ func (d *deviceData) Swap(i, j int) {
// executeBatch, we pack some insert request in batch and send them to server.
If you want improve
// your performance, please see insertTablet method
// Each row is independent, which could have different insertTargetName, time,
number of measurements
-func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps
[]int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType,
valuesSlice [][]interface{}, sorted bool) (r *common.TSStatus, err error) {
+func (s *Session) InsertRecordsOfOneDevice(deviceId string, timestamps
[]int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType,
valuesSlice [][]interface{}, sorted bool) error {
length := len(timestamps)
if len(measurementsSlice) != length || len(dataTypesSlice) != length ||
len(valuesSlice) != length {
- return nil, errors.New("timestamps, measurementsSlice and
valuesSlice's size should be equal")
+ return errors.New("timestamps, measurementsSlice and
valuesSlice's size should be equal")
}
if !sorted {
@@ -724,10 +765,11 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId
string, timestamps []int64,
})
}
+ var err error
valuesList := make([][]byte, length)
for i := 0; i < length; i++ {
if valuesList[i], err = valuesToBytes(dataTypesSlice[i],
valuesSlice[i], measurementsSlice[i]); err != nil {
- return nil, err
+ return err
}
}
@@ -739,7 +781,7 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId string,
timestamps []int64,
ValuesList: valuesList,
}
- r, err = s.client.InsertRecordsOfOneDevice(context.Background(),
request)
+ r, err := s.client.InsertRecordsOfOneDevice(context.Background(),
request)
if err != nil && r == nil {
if s.reconnect() {
@@ -748,13 +790,16 @@ func (s *Session) InsertRecordsOfOneDevice(deviceId
string, timestamps []int64,
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
-func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps
[]int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType,
valuesSlice [][]interface{}, sorted bool) (r *common.TSStatus, err error) {
+func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId string, timestamps
[]int64, measurementsSlice [][]string, dataTypesSlice [][]TSDataType,
valuesSlice [][]interface{}, sorted bool) error {
length := len(timestamps)
if len(measurementsSlice) != length || len(dataTypesSlice) != length ||
len(valuesSlice) != length {
- return nil, errors.New("timestamps, measurementsSlice and
valuesSlice's size should be equal")
+ return errors.New("timestamps, measurementsSlice and
valuesSlice's size should be equal")
}
if !sorted {
@@ -766,10 +811,11 @@ func (s *Session)
InsertAlignedRecordsOfOneDevice(deviceId string, timestamps []
})
}
+ var err error
valuesList := make([][]byte, length)
for i := 0; i < length; i++ {
if valuesList[i], err = valuesToBytes(dataTypesSlice[i],
valuesSlice[i], measurementsSlice[i]); err != nil {
- return nil, err
+ return err
}
}
isAligned := true
@@ -782,7 +828,7 @@ func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId
string, timestamps []
IsAligned: &isAligned,
}
- r, err = s.client.InsertRecordsOfOneDevice(context.Background(),
request)
+ r, err := s.client.InsertRecordsOfOneDevice(context.Background(),
request)
if err != nil && r == nil {
if s.reconnect() {
@@ -791,7 +837,10 @@ func (s *Session) InsertAlignedRecordsOfOneDevice(deviceId
string, timestamps []
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
/*
@@ -808,37 +857,43 @@ func (s *Session)
InsertAlignedRecordsOfOneDevice(deviceId string, timestamps []
*/
func (s *Session) InsertRecords(deviceIds []string, measurements [][]string,
dataTypes [][]TSDataType, values [][]interface{},
timestamps []int64,
-) (r *common.TSStatus, err error) {
+) error {
request, err := s.genInsertRecordsReq(deviceIds, measurements,
dataTypes, values, timestamps, false)
if err != nil {
- return nil, err
+ return err
} else {
- r, err = s.client.InsertRecords(context.Background(), request)
+ r, err := s.client.InsertRecords(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err =
s.client.InsertRecords(context.Background(), request)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
}
func (s *Session) InsertAlignedRecords(deviceIds []string, measurements
[][]string, dataTypes [][]TSDataType, values [][]interface{},
timestamps []int64,
-) (r *common.TSStatus, err error) {
+) error {
request, err := s.genInsertRecordsReq(deviceIds, measurements,
dataTypes, values, timestamps, true)
if err != nil {
- return nil, err
+ return err
} else {
- r, err = s.client.InsertRecords(context.Background(), request)
+ r, err := s.client.InsertRecords(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err =
s.client.InsertRecords(context.Background(), request)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
}
@@ -847,63 +902,72 @@ func (s *Session) InsertAlignedRecords(deviceIds
[]string, measurements [][]stri
*params
*tablets: []*client.Tablet, list of tablets
*/
-func (s *Session) InsertTablets(tablets []*Tablet, sorted bool) (r
*common.TSStatus, err error) {
+func (s *Session) InsertTablets(tablets []*Tablet, sorted bool) error {
if !sorted {
for _, t := range tablets {
if err := t.Sort(); err != nil {
- return nil, err
+ return err
}
}
}
request, err := s.genInsertTabletsReq(tablets, false)
if err != nil {
- return nil, err
+ return err
}
- r, err = s.client.InsertTablets(context.Background(), request)
+ r, err := s.client.InsertTablets(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertTablets(context.Background(),
request)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
-func (s *Session) InsertAlignedTablets(tablets []*Tablet, sorted bool) (r
*common.TSStatus, err error) {
+func (s *Session) InsertAlignedTablets(tablets []*Tablet, sorted bool) error {
if !sorted {
for _, t := range tablets {
if err := t.Sort(); err != nil {
- return nil, err
+ return err
}
}
}
request, err := s.genInsertTabletsReq(tablets, true)
if err != nil {
- return nil, err
+ return err
}
- r, err = s.client.InsertTablets(context.Background(), request)
+ r, err := s.client.InsertTablets(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err = s.client.InsertTablets(context.Background(),
request)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
-func (s *Session) ExecuteBatchStatement(inserts []string) (r *common.TSStatus,
err error) {
+func (s *Session) ExecuteBatchStatement(inserts []string) error {
request := rpc.TSExecuteBatchStatementReq{
SessionId: s.sessionId,
Statements: inserts,
}
- r, err = s.client.ExecuteBatchStatement(context.Background(), &request)
+ r, err := s.client.ExecuteBatchStatement(context.Background(), &request)
if err != nil && r == nil {
if s.reconnect() {
request.SessionId = s.sessionId
r, err =
s.client.ExecuteBatchStatement(context.Background(), &request)
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
func (s *Session) ExecuteRawDataQuery(paths []string, startTime int64, endTime
int64) (*SessionDataSet, error) {
@@ -1111,17 +1175,17 @@ func valuesToBytes(dataTypes []TSDataType, values
[]interface{}, measurementName
return buff.Bytes(), nil
}
-func (s *Session) insertRelationalTablet(tablet *Tablet) (r *common.TSStatus,
err error) {
+func (s *Session) insertRelationalTablet(tablet *Tablet) error {
if tablet.Len() == 0 {
- return &common.TSStatus{Code: SuccessStatus}, nil
+ return nil
}
request, err := s.genTSInsertTabletReq(tablet, true, true)
if err != nil {
- return nil, err
+ return err
}
request.ColumnCategories = tablet.getColumnCategories()
- r, err = s.client.InsertTablet(context.Background(), request)
+ r, err := s.client.InsertTablet(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
@@ -1130,21 +1194,24 @@ func (s *Session) insertRelationalTablet(tablet
*Tablet) (r *common.TSStatus, er
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
-func (s *Session) InsertTablet(tablet *Tablet, sorted bool) (r
*common.TSStatus, err error) {
+func (s *Session) InsertTablet(tablet *Tablet, sorted bool) error {
if !sorted {
if err := tablet.Sort(); err != nil {
- return nil, err
+ return err
}
}
request, err := s.genTSInsertTabletReq(tablet, false, false)
if err != nil {
- return nil, err
+ return err
}
- r, err = s.client.InsertTablet(context.Background(), request)
+ r, err := s.client.InsertTablet(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
@@ -1153,21 +1220,24 @@ func (s *Session) InsertTablet(tablet *Tablet, sorted
bool) (r *common.TSStatus,
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
-func (s *Session) InsertAlignedTablet(tablet *Tablet, sorted bool) (r
*common.TSStatus, err error) {
+func (s *Session) InsertAlignedTablet(tablet *Tablet, sorted bool) error {
if !sorted {
if err := tablet.Sort(); err != nil {
- return nil, err
+ return err
}
}
request, err := s.genTSInsertTabletReq(tablet, true, false)
if err != nil {
- return nil, err
+ return err
}
- r, err = s.client.InsertTablet(context.Background(), request)
+ r, err := s.client.InsertTablet(context.Background(), request)
if err != nil && r == nil {
if s.reconnect() {
@@ -1176,7 +1246,10 @@ func (s *Session) InsertAlignedTablet(tablet *Tablet,
sorted bool) (r *common.TS
}
}
- return r, err
+ if err != nil {
+ return err
+ }
+ return VerifySuccess(r)
}
func (s *Session) genTSInsertTabletReq(tablet *Tablet, isAligned bool,
writeToTable bool) (*rpc.TSInsertTabletReq, error) {
diff --git a/client/tablesession.go b/client/tablesession.go
index cea724d..dce44f9 100644
--- a/client/tablesession.go
+++ b/client/tablesession.go
@@ -19,8 +19,6 @@
package client
-import "github.com/apache/iotdb-client-go/v2/common"
-
// ITableSession defines an interface for interacting with IoTDB tables.
// It supports operations such as data insertion, executing queries, and
closing the session.
// Implementations of this interface are expected to manage connections and
ensure
@@ -39,9 +37,8 @@ type ITableSession interface {
// - tablet: A pointer to a Tablet containing time-series data to be
inserted.
//
// Returns:
- // - r: A pointer to TSStatus indicating the execution result.
// - err: An error if an issue occurs during the operation, such as a
connection error or execution failure.
- Insert(tablet *Tablet) (r *common.TSStatus, err error)
+ Insert(tablet *Tablet) error
// ExecuteNonQueryStatement executes a non-query SQL statement, such as
a DDL or DML command.
//
@@ -49,9 +46,8 @@ type ITableSession interface {
// - sql: The SQL statement to execute.
//
// Returns:
- // - r: A pointer to TSStatus indicating the execution result.
// - err: An error if an issue occurs during the operation, such as a
connection error or execution failure.
- ExecuteNonQueryStatement(sql string) (r *common.TSStatus, err error)
+ ExecuteNonQueryStatement(sql string) error
// ExecuteQueryStatement executes a query SQL statement and returns the
result set.
//
@@ -125,9 +121,8 @@ func NewClusterTableSession(clusterConfig *ClusterConfig,
enableRPCCompression b
// - tablet: A pointer to a Tablet containing the data to be inserted.
//
// Returns:
-// - r: A pointer to TSStatus indicating the execution result.
// - err: An error if the operation fails.
-func (s *TableSession) Insert(tablet *Tablet) (r *common.TSStatus, err error) {
+func (s *TableSession) Insert(tablet *Tablet) error {
return s.session.insertRelationalTablet(tablet)
}
@@ -137,9 +132,8 @@ func (s *TableSession) Insert(tablet *Tablet) (r
*common.TSStatus, err error) {
// - sql: The SQL statement to be executed.
//
// Returns:
-// - r: A pointer to TSStatus indicating the execution result.
// - err: An error if the operation fails.
-func (s *TableSession) ExecuteNonQueryStatement(sql string) (r
*common.TSStatus, err error) {
+func (s *TableSession) ExecuteNonQueryStatement(sql string) error {
return s.session.ExecuteNonQueryStatement(sql)
}
diff --git a/client/tablesessionpool.go b/client/tablesessionpool.go
index 5df5713..482d56e 100644
--- a/client/tablesessionpool.go
+++ b/client/tablesessionpool.go
@@ -20,10 +20,9 @@
package client
import (
+ "errors"
"log"
"sync/atomic"
-
- "github.com/apache/iotdb-client-go/v2/common"
)
// TableSessionPool manages a pool of ITableSession instances, enabling
efficient
@@ -75,23 +74,41 @@ type PooledTableSession struct {
closed int32
}
+// isConnectionError returns true if the error is a connection-level error
+// (i.e., not a server-side execution error indicated by TSStatus).
+func isConnectionError(err error) bool {
+ if err == nil {
+ return false
+ }
+ var exeErr *ExecutionError
+ if errors.As(err, &exeErr) {
+ return false
+ }
+ var batchErr *BatchError
+ if errors.As(err, &batchErr) {
+ return false
+ }
+ return true
+}
+
// Insert inserts a Tablet into the database.
//
// Parameters:
// - tablet: A pointer to a Tablet containing time-series data to be
inserted.
//
// Returns:
-// - r: A pointer to TSStatus indicating the execution result.
// - err: An error if an issue occurs during the operation.
-func (s *PooledTableSession) Insert(tablet *Tablet) (r *common.TSStatus, err
error) {
- r, err = s.session.insertRelationalTablet(tablet)
+func (s *PooledTableSession) Insert(tablet *Tablet) error {
+ err := s.session.insertRelationalTablet(tablet)
if err == nil {
- return
+ return nil
}
- s.sessionPool.dropSession(s.session)
- atomic.StoreInt32(&s.closed, 1)
- s.session = Session{}
- return
+ if isConnectionError(err) {
+ s.sessionPool.dropSession(s.session)
+ atomic.StoreInt32(&s.closed, 1)
+ s.session = Session{}
+ }
+ return err
}
// ExecuteNonQueryStatement executes a non-query SQL statement, such as a DDL
or DML command.
@@ -100,17 +117,18 @@ func (s *PooledTableSession) Insert(tablet *Tablet) (r
*common.TSStatus, err err
// - sql: The SQL statement to execute.
//
// Returns:
-// - r: A pointer to TSStatus indicating the execution result.
// - err: An error if an issue occurs during the operation.
-func (s *PooledTableSession) ExecuteNonQueryStatement(sql string) (r
*common.TSStatus, err error) {
- r, err = s.session.ExecuteNonQueryStatement(sql)
+func (s *PooledTableSession) ExecuteNonQueryStatement(sql string) error {
+ err := s.session.ExecuteNonQueryStatement(sql)
if err == nil {
- return
+ return nil
}
- s.sessionPool.dropSession(s.session)
- atomic.StoreInt32(&s.closed, 1)
- s.session = Session{}
- return
+ if isConnectionError(err) {
+ s.sessionPool.dropSession(s.session)
+ atomic.StoreInt32(&s.closed, 1)
+ s.session = Session{}
+ }
+ return err
}
// ExecuteQueryStatement executes a query SQL statement and returns the result
set.
@@ -127,9 +145,11 @@ func (s *PooledTableSession) ExecuteQueryStatement(sql
string, timeoutInMs *int6
if err == nil {
return sessionDataSet, nil
}
- s.sessionPool.dropSession(s.session)
- atomic.StoreInt32(&s.closed, 1)
- s.session = Session{}
+ if isConnectionError(err) {
+ s.sessionPool.dropSession(s.session)
+ atomic.StoreInt32(&s.closed, 1)
+ s.session = Session{}
+ }
return nil, err
}
@@ -140,8 +160,8 @@ func (s *PooledTableSession) ExecuteQueryStatement(sql
string, timeoutInMs *int6
func (s *PooledTableSession) Close() error {
if atomic.CompareAndSwapInt32(&s.closed, 0, 1) {
if s.session.config.Database != s.sessionPool.config.Database
&& s.sessionPool.config.Database != "" {
- r, err := s.session.ExecuteNonQueryStatement("use " +
s.sessionPool.config.Database)
- if r.Code == ExecuteStatementError || err != nil {
+ err := s.session.ExecuteNonQueryStatement("use " +
s.sessionPool.config.Database)
+ if err != nil {
log.Println("Failed to change back database by
executing: use ", s.sessionPool.config.Database)
s.session.Close()
return nil
diff --git a/client/utils.go b/client/utils.go
index 7ed8f27..276cbe4 100644
--- a/client/utils.go
+++ b/client/utils.go
@@ -249,7 +249,7 @@ func verifySuccesses(statuses []*common.TSStatus) error {
}
errMsg := buff.String()
if len(errMsg) > 0 {
- return NewBatchError(statuses)
+ return &BatchError{statuses, errMsg}
}
return nil
}
@@ -266,11 +266,11 @@ func VerifySuccess(status *common.TSStatus) error {
return nil
}
if status.Code != SuccessStatus {
+ msg := ""
if status.Message != nil {
- return fmt.Errorf("error code: %d, message: %v",
status.Code, *status.Message)
- } else {
- return fmt.Errorf("error code: %d", status.Code)
+ msg = *status.Message
}
+ return &ExecutionError{Code: status.Code, Message: msg}
}
return nil
}
diff --git a/example/session_example.go b/example/session_example.go
index 87e5490..c169960 100644
--- a/example/session_example.go
+++ b/example/session_example.go
@@ -465,9 +465,9 @@ func deleteData() {
func insertTablet() {
if tablet, err := createTablet(12); err == nil {
- status, err := session.InsertTablet(tablet, false)
+ err = session.InsertTablet(tablet, false)
tablet.Reset()
- checkError(status, err)
+ checkError(err)
} else {
log.Fatal(err)
}
@@ -475,9 +475,9 @@ func insertTablet() {
func insertAlignedTablet() {
if tablet, err := createTablet(12); err == nil {
- status, err := session.InsertAlignedTablet(tablet, false)
+ err = session.InsertAlignedTablet(tablet, false)
tablet.Reset()
- checkError(status, err)
+ checkError(err)
} else {
log.Fatal(err)
}
@@ -642,14 +642,8 @@ func executeBatchStatement() {
}
}
-func checkError(status *common.TSStatus, err error) {
+func checkError(err error) {
if err != nil {
log.Fatal(err)
}
-
- if status != nil {
- if err = client.VerifySuccess(status); err != nil {
- log.Println(err)
- }
- }
}
diff --git a/example/session_pool/session_pool_example.go
b/example/session_pool/session_pool_example.go
index 6b043d1..bd56471 100644
--- a/example/session_pool/session_pool_example.go
+++ b/example/session_pool/session_pool_example.go
@@ -28,7 +28,6 @@ import (
"time"
"github.com/apache/iotdb-client-go/v2/client"
- "github.com/apache/iotdb-client-go/v2/common"
)
var (
@@ -410,9 +409,9 @@ func insertTablet() {
defer sessionPool.PutBack(session)
if err == nil {
if tablet, err := createTablet(12); err == nil {
- status, err := session.InsertTablet(tablet, false)
+ err := session.InsertTablet(tablet, false)
tablet.Reset()
- checkError(status, err)
+ checkError(err)
} else {
log.Fatal(err)
}
@@ -425,9 +424,9 @@ func insertAlignedTablet() {
defer sessionPool.PutBack(session)
if err == nil {
if tablet, err := createTablet(12); err == nil {
- status, err := session.InsertAlignedTablet(tablet,
false)
+ err := session.InsertAlignedTablet(tablet, false)
tablet.Reset()
- checkError(status, err)
+ checkError(err)
} else {
log.Fatal(err)
}
@@ -725,14 +724,8 @@ func printDataSet2(sds *client.SessionDataSet) {
}
}
-func checkError(status *common.TSStatus, err error) {
+func checkError(err error) {
if err != nil {
log.Fatal(err)
}
-
- if status != nil {
- if err = client.VerifySuccess(status); err != nil {
- log.Println(err)
- }
- }
}
diff --git a/example/session_pool/table/table_session_pool_example.go
b/example/session_pool/table/table_session_pool_example.go
index a6a85cb..e14f1cc 100644
--- a/example/session_pool/table/table_session_pool_example.go
+++ b/example/session_pool/table/table_session_pool_example.go
@@ -27,7 +27,6 @@ import (
"time"
"github.com/apache/iotdb-client-go/v2/client"
- "github.com/apache/iotdb-client-go/v2/common"
)
func main() {
@@ -168,14 +167,8 @@ func sessionPoolWithoutSpecificDatabaseExample() {
wg.Wait()
}
-func checkError(status *common.TSStatus, err error) {
+func checkError(err error) {
if err != nil {
log.Fatal(err)
}
-
- if status != nil {
- if err = client.VerifySuccess(status); err != nil {
- log.Println(err)
- }
- }
}
diff --git a/example/table/table_session_example.go
b/example/table/table_session_example.go
index c0aed28..fc43f12 100644
--- a/example/table/table_session_example.go
+++ b/example/table/table_session_example.go
@@ -27,7 +27,6 @@ import (
"time"
"github.com/apache/iotdb-client-go/v2/client"
- "github.com/apache/iotdb-client-go/v2/common"
)
func main() {
@@ -159,14 +158,8 @@ func query(session client.ITableSession) {
}
}
-func checkError(status *common.TSStatus, err error) {
+func checkError(err error) {
if err != nil {
log.Fatal(err)
}
-
- if status != nil {
- if err = client.VerifySuccess(status); err != nil {
- log.Println(err)
- }
- }
}
diff --git a/test/e2e/e2e_table_test.go b/test/e2e/e2e_table_test.go
index 7f019f8..b5c092a 100644
--- a/test/e2e/e2e_table_test.go
+++ b/test/e2e/e2e_table_test.go
@@ -31,7 +31,6 @@ import (
"github.com/stretchr/testify/suite"
"github.com/apache/iotdb-client-go/v2/client"
- "github.com/apache/iotdb-client-go/v2/common"
)
var (
@@ -394,9 +393,6 @@ func getValueFromDataSetByIndex(dataSet
*client.SessionDataSet, columnIndex int3
return v
}
-func (s *e2eTableTestSuite) checkError(status *common.TSStatus, err error) {
+func (s *e2eTableTestSuite) checkError(err error) {
s.Require().NoError(err)
- if status != nil {
- s.Require().NoError(client.VerifySuccess(status))
- }
}
diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go
index 2557955..8fd2167 100644
--- a/test/e2e/e2e_test.go
+++ b/test/e2e/e2e_test.go
@@ -31,7 +31,6 @@ import (
"github.com/stretchr/testify/suite"
"github.com/apache/iotdb-client-go/v2/client"
- "github.com/apache/iotdb-client-go/v2/common"
)
type e2eTestSuite struct {
@@ -61,20 +60,17 @@ func (s *e2eTestSuite) TearDownSuite() {
}
func (s *e2eTestSuite) SetupTest() {
- r, err := s.session.SetStorageGroup("root.tsg1")
- s.checkError(r, err)
+ err := s.session.SetStorageGroup("root.tsg1")
+ s.checkError(err)
}
func (s *e2eTestSuite) TearDownTest() {
- r, err := s.session.DeleteStorageGroup("root.tsg1")
- s.checkError(r, err)
+ err := s.session.DeleteStorageGroup("root.tsg1")
+ s.checkError(err)
}
-func (s *e2eTestSuite) checkError(status *common.TSStatus, err error) {
+func (s *e2eTestSuite) checkError(err error) {
s.Require().NoError(err)
- if status != nil {
- s.Require().NoError(client.VerifySuccess(status))
- }
}
func (s *e2eTestSuite) Test_WrongURL() {
@@ -174,7 +170,7 @@ func (s *e2eTestSuite) Test_InsertRecordsWithWrongType() {
values = [][]interface{}{{100.0, true}, {"aaa"}}
timestamp = []int64{1, 2}
)
- _, err := s.session.InsertRecords(deviceId, measurements, dataTypes,
values, timestamp)
+ err := s.session.InsertRecords(deviceId, measurements, dataTypes,
values, timestamp)
assert := s.Require()
assert.NotNil(err)
assert.Equal("measurement s1 values[0] 100(float64) must be bool",
err.Error())
@@ -255,8 +251,8 @@ func (s *e2eTestSuite) Test_InsertAlignedTablet() {
var timeseries = []string{"root.ln.device1.**"}
s.session.DeleteTimeseries(timeseries)
if tablet, err := createTablet(12); err == nil {
- status, err := s.session.InsertAlignedTablet(tablet, false)
- s.checkError(status, err)
+ err := s.session.InsertAlignedTablet(tablet, false)
+ s.checkError(err)
tablet.Reset()
} else {
log.Fatal(err)
@@ -277,8 +273,8 @@ func (s *e2eTestSuite)
Test_InsertAlignedTabletWithNilValue() {
var timeseries = []string{"root.ln.device1.**"}
s.session.DeleteTimeseries(timeseries)
if tablet, err := createTabletWithNil(12); err == nil {
- status, err := s.session.InsertAlignedTablet(tablet, false)
- s.checkError(status, err)
+ err := s.session.InsertAlignedTablet(tablet, false)
+ s.checkError(err)
tablet.Reset()
} else {
log.Fatal(err)
@@ -499,8 +495,8 @@ func (s *e2eTestSuite) Test_QueryAllDataType() {
tablet.SetValueAt("string", 9, 0)
tablet.RowSize = 1
- r, err := s.session.InsertAlignedTablet(tablet, true)
- s.checkError(r, err)
+ err = s.session.InsertAlignedTablet(tablet, true)
+ s.checkError(err)
sessionDataSet, err := s.session.ExecuteQueryStatement("select s0, s1,
s2, s3, s4, s5, s6, s7, s8, s9 from root.tsg1.d1 limit 1", nil)
for {