This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb-client-go.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new fad2e86 [to dev/1.3] add tsblock
fad2e86 is described below
commit fad2e86531ca448d2606df40962e47764b052826
Author: shuwenwei <[email protected]>
AuthorDate: Wed Jun 18 17:13:45 2025 +0800
[to dev/1.3] add tsblock
---
client/column.go | 821 +++++++++++++++++++++++
client/column_decoder.go | 296 +++++++++
client/protocol.go | 48 ++
client/rpcdataset.go | 960 +++++++++++++--------------
client/rpcdataset_test.go | 666 -------------------
client/session.go | 64 +-
client/sessiondataset.go | 150 +++--
client/tablet.go | 4 +-
client/tsblock.go | 162 +++++
client/utils.go | 22 +-
example/session_example.go | 128 ++--
example/session_pool/session_pool_example.go | 146 ++--
test/e2e/e2e_test.go | 299 ++++++++-
13 files changed, 2307 insertions(+), 1459 deletions(-)
diff --git a/client/column.go b/client/column.go
new file mode 100644
index 0000000..23cb274
--- /dev/null
+++ b/client/column.go
@@ -0,0 +1,821 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import "fmt"
+
+type ColumnEncoding uint8
+
+const (
+ BYTE_ARRAY_COLUMN_ENCODING = ColumnEncoding(iota)
+ INT32_ARRAY_COLUMN_ENCODING
+ INT64_ARRAY_COLUMN_ENCODING
+ BINARY_ARRAY_COLUMN_ENCODING
+ RLE_COLUMN_ENCODING
+)
+
+var encodingToDecoder = map[ColumnEncoding]ColumnDecoder{
+ INT32_ARRAY_COLUMN_ENCODING: new(Int32ArrayColumnDecoder),
+ INT64_ARRAY_COLUMN_ENCODING: new(Int64ArrayColumnDecoder),
+ BYTE_ARRAY_COLUMN_ENCODING: new(ByteArrayColumnDecoder),
+ BINARY_ARRAY_COLUMN_ENCODING: new(BinaryArrayColumnDecoder),
+ RLE_COLUMN_ENCODING: new(RunLengthColumnDecoder),
+}
+
+var byteToEncoding = map[byte]ColumnEncoding{
+ 0: BYTE_ARRAY_COLUMN_ENCODING,
+ 1: INT32_ARRAY_COLUMN_ENCODING,
+ 2: INT64_ARRAY_COLUMN_ENCODING,
+ 3: BINARY_ARRAY_COLUMN_ENCODING,
+ 4: RLE_COLUMN_ENCODING,
+}
+
+func getColumnDecoder(encoding ColumnEncoding) (ColumnDecoder, error) {
+ decoder, exists := encodingToDecoder[encoding]
+ if !exists {
+ return nil, fmt.Errorf("unsupported column encoding: %v",
encoding)
+ }
+ return decoder, nil
+}
+
+func getColumnEncodingByByte(b byte) (ColumnEncoding, error) {
+ encoding, exists := byteToEncoding[b]
+ if !exists {
+ return INT32_ARRAY_COLUMN_ENCODING, fmt.Errorf("invalid value:
%v", b)
+ }
+ return encoding, nil
+}
+
+type Column interface {
+ GetDataType() TSDataType
+ GetEncoding() ColumnEncoding
+ GetBoolean(position int32) (bool, error)
+ GetInt(position int32) (int32, error)
+ GetLong(position int32) (int64, error)
+ GetFloat(position int32) (float32, error)
+ GetDouble(position int32) (float64, error)
+ GetBinary(position int32) (*Binary, error)
+ GetObject(position int32) (interface{}, error)
+
+ GetBooleans() ([]bool, error)
+ GetInts() ([]int32, error)
+ GetLongs() ([]int64, error)
+ GetFloats() ([]float32, error)
+ GetDoubles() ([]float64, error)
+ GetBinaries() ([]*Binary, error)
+ GetObjects() ([]interface{}, error)
+
+ MayHaveNull() bool
+ IsNull(position int32) bool
+ IsNulls() []bool
+
+ GetPositionCount() int32
+}
+
+type baseColumn struct {
+}
+
+func (c *baseColumn) GetBoolean(_ int32) (bool, error) {
+ return false, fmt.Errorf("unsupported operation: GetBoolean")
+}
+
+func (c *baseColumn) GetInt(_ int32) (int32, error) {
+ return 0, fmt.Errorf("unsupported operation: GetInt")
+}
+
+func (c *baseColumn) GetLong(_ int32) (int64, error) {
+ return 0, fmt.Errorf("unsupported operation: GetLong")
+}
+
+func (c *baseColumn) GetFloat(_ int32) (float32, error) {
+ return 0, fmt.Errorf("unsupported operation: GetFloat")
+}
+
+func (c *baseColumn) GetDouble(_ int32) (float64, error) {
+ return 0, fmt.Errorf("unsupported operation: GetDouble")
+}
+
+func (c *baseColumn) GetBinary(_ int32) (*Binary, error) {
+ return nil, fmt.Errorf("unsupported operation: GetBinary")
+}
+
+func (c *baseColumn) GetObject(_ int32) (interface{}, error) {
+ return nil, fmt.Errorf("unsupported operation: GetObject")
+}
+
+func (c *baseColumn) GetBooleans() ([]bool, error) {
+ return nil, fmt.Errorf("unsupported operation: GetBooleans")
+}
+
+func (c *baseColumn) GetInts() ([]int32, error) {
+ return nil, fmt.Errorf("unsupported operation: GetInts")
+}
+
+func (c *baseColumn) GetLongs() ([]int64, error) {
+ return nil, fmt.Errorf("unsupported operation: GetLongs")
+}
+
+func (c *baseColumn) GetFloats() ([]float32, error) {
+ return nil, fmt.Errorf("unsupported operation: GetFloats")
+}
+
+func (c *baseColumn) GetDoubles() ([]float64, error) {
+ return nil, fmt.Errorf("unsupported operation: GetDoubles")
+}
+
+func (c *baseColumn) GetBinaries() ([]*Binary, error) {
+ return nil, fmt.Errorf("unsupported operation: GetBinaries")
+}
+
+func (c *baseColumn) GetObjects() ([]interface{}, error) {
+ return nil, fmt.Errorf("unsupported operation: GetObjects")
+}
+
+type TimeColumn struct {
+ baseColumn
+ arrayOffset int32
+ positionCount int32
+ values []int64
+}
+
+func NewTimeColumn(arrayOffset int32, positionCount int32, values []int64)
(*TimeColumn, error) {
+ if arrayOffset < 0 {
+ return nil, fmt.Errorf("arrayOffset is negative")
+ }
+ if positionCount < 0 {
+ return nil, fmt.Errorf("arrayOffset is negative")
+ }
+ if int32(len(values))-arrayOffset < positionCount {
+ return nil, fmt.Errorf("values length is less than
positionCount")
+ }
+ return &TimeColumn{
+ arrayOffset: arrayOffset,
+ positionCount: positionCount,
+ values: values,
+ }, nil
+}
+
+func (tc *TimeColumn) GetDataType() TSDataType {
+ return INT64
+}
+
+func (tc *TimeColumn) GetEncoding() ColumnEncoding {
+ return INT64_ARRAY_COLUMN_ENCODING
+}
+
+func (tc *TimeColumn) GetLong(position int32) (int64, error) {
+ return tc.values[position+tc.arrayOffset], nil
+}
+
+func (tc *TimeColumn) MayHaveNull() bool {
+ return false
+}
+
+func (tc *TimeColumn) IsNull(_ int32) bool {
+ return false
+}
+
+func (tc *TimeColumn) IsNulls() []bool {
+ return nil
+}
+
+func (tc *TimeColumn) GetPositionCount() int32 {
+ return tc.positionCount
+}
+
+func (tc *TimeColumn) GetStartTime() int64 {
+ return tc.values[tc.arrayOffset]
+}
+
+func (tc *TimeColumn) GetEndTime() int64 {
+ return tc.values[tc.positionCount+tc.arrayOffset-1]
+}
+
+func (tc *TimeColumn) GetTimes() []int64 {
+ return tc.values
+}
+
+func (tc *TimeColumn) GetLongs() ([]int64, error) {
+ return tc.GetTimes(), nil
+}
+
+type BinaryColumn struct {
+ baseColumn
+ arrayOffset int32
+ positionCount int32
+ valueIsNull []bool
+ values []*Binary
+}
+
+func NewBinaryColumn(arrayOffset int32, positionCount int32, valueIsNull
[]bool, values []*Binary) (*BinaryColumn, error) {
+ if arrayOffset < 0 {
+ return nil, fmt.Errorf("arrayOffset is negative")
+ }
+ if positionCount < 0 {
+ return nil, fmt.Errorf("positionCount is negative")
+ }
+ if int32(len(values))-arrayOffset < positionCount {
+ return nil, fmt.Errorf("values length is less than
positionCount")
+ }
+ if valueIsNull != nil && int32(len(valueIsNull))-arrayOffset <
positionCount {
+ return nil, fmt.Errorf("isNull length is less than
positionCount")
+ }
+ return &BinaryColumn{
+ arrayOffset: arrayOffset,
+ positionCount: positionCount,
+ valueIsNull: valueIsNull,
+ values: values,
+ }, nil
+}
+
+func (c *BinaryColumn) GetDataType() TSDataType {
+ return TEXT
+}
+
+func (c *BinaryColumn) GetEncoding() ColumnEncoding {
+ return BINARY_ARRAY_COLUMN_ENCODING
+}
+
+func (c *BinaryColumn) GetBinary(position int32) (*Binary, error) {
+ return c.values[position+c.arrayOffset], nil
+}
+
+func (c *BinaryColumn) GetBinaries() ([]*Binary, error) {
+ return c.values, nil
+}
+
+func (c *BinaryColumn) GetObject(position int32) (interface{}, error) {
+ return c.GetBinary(position)
+}
+
+func (c *BinaryColumn) MayHaveNull() bool {
+ return c.valueIsNull != nil
+}
+
+func (c *BinaryColumn) IsNull(position int32) bool {
+ return c.valueIsNull != nil && c.valueIsNull[position+c.arrayOffset]
+}
+
+func (c *BinaryColumn) IsNulls() []bool {
+ if c.valueIsNull != nil {
+ return c.valueIsNull
+ }
+ result := make([]bool, c.positionCount)
+ for i := int32(0); i < c.positionCount; i++ {
+ result[i] = false
+ }
+ return result
+}
+
+func (c *BinaryColumn) GetPositionCount() int32 {
+ return c.positionCount
+}
+
+type IntColumn struct {
+ baseColumn
+ arrayOffset int32
+ positionCount int32
+ valueIsNull []bool
+ values []int32
+}
+
+func NewIntColumn(arrayOffset int32, positionCount int32, valueIsNull []bool,
values []int32) (*IntColumn, error) {
+ if arrayOffset < 0 {
+ return nil, fmt.Errorf("arrayOffset is negative")
+ }
+ if positionCount < 0 {
+ return nil, fmt.Errorf("positionCount is negative")
+ }
+ if int32(len(values))-arrayOffset < positionCount {
+ return nil, fmt.Errorf("values length is less than
positionCount")
+ }
+ if valueIsNull != nil && int32(len(valueIsNull))-arrayOffset <
positionCount {
+ return nil, fmt.Errorf("isNull length is less than
positionCount")
+ }
+ return &IntColumn{
+ arrayOffset: arrayOffset,
+ positionCount: positionCount,
+ valueIsNull: valueIsNull,
+ values: values,
+ }, nil
+}
+
+func (c *IntColumn) GetDataType() TSDataType {
+ return INT32
+}
+
+func (c *IntColumn) GetEncoding() ColumnEncoding {
+ return INT32_ARRAY_COLUMN_ENCODING
+}
+
+func (c *IntColumn) GetInt(position int32) (int32, error) {
+ return c.values[position+c.arrayOffset], nil
+}
+
+func (c *IntColumn) GetInts() ([]int32, error) {
+ return c.values, nil
+}
+
+func (c *IntColumn) GetObject(position int32) (interface{}, error) {
+ return c.GetInt(position)
+}
+
+func (c *IntColumn) MayHaveNull() bool {
+ return c.valueIsNull != nil
+}
+
+func (c *IntColumn) IsNull(position int32) bool {
+ return c.valueIsNull != nil && c.valueIsNull[position+c.arrayOffset]
+}
+
+func (c *IntColumn) IsNulls() []bool {
+ if c.valueIsNull != nil {
+ return c.valueIsNull
+ }
+ result := make([]bool, c.positionCount)
+ for i := int32(0); i < c.positionCount; i++ {
+ result[i] = false
+ }
+ return result
+}
+
+func (c *IntColumn) GetPositionCount() int32 {
+ return c.positionCount
+}
+
+type FloatColumn struct {
+ baseColumn
+ arrayOffset int32
+ positionCount int32
+ valueIsNull []bool
+ values []float32
+}
+
+func NewFloatColumn(arrayOffset int32, positionCount int32, valueIsNull
[]bool, values []float32) (*FloatColumn, error) {
+ if arrayOffset < 0 {
+ return nil, fmt.Errorf("arrayOffset is negative")
+ }
+ if positionCount < 0 {
+ return nil, fmt.Errorf("positionCount is negative")
+ }
+ if int32(len(values))-arrayOffset < positionCount {
+ return nil, fmt.Errorf("values length is less than
positionCount")
+ }
+ if valueIsNull != nil && int32(len(valueIsNull))-arrayOffset <
positionCount {
+ return nil, fmt.Errorf("isNull length is less than
positionCount")
+ }
+ return &FloatColumn{
+ arrayOffset: arrayOffset,
+ positionCount: positionCount,
+ valueIsNull: valueIsNull,
+ values: values,
+ }, nil
+}
+
+func (c *FloatColumn) GetDataType() TSDataType {
+ return FLOAT
+}
+
+func (c *FloatColumn) GetEncoding() ColumnEncoding {
+ return INT32_ARRAY_COLUMN_ENCODING
+}
+
+func (c *FloatColumn) GetFloat(position int32) (float32, error) {
+ return c.values[position+c.arrayOffset], nil
+}
+
+func (c *FloatColumn) GetFloats() ([]float32, error) {
+ return c.values, nil
+}
+
+func (c *FloatColumn) GetObject(position int32) (interface{}, error) {
+ return c.GetFloat(position)
+}
+
+func (c *FloatColumn) MayHaveNull() bool {
+ return c.valueIsNull != nil
+}
+
+func (c *FloatColumn) IsNull(position int32) bool {
+ return c.valueIsNull != nil && c.valueIsNull[position+c.arrayOffset]
+}
+
+func (c *FloatColumn) IsNulls() []bool {
+ if c.valueIsNull != nil {
+ return c.valueIsNull
+ }
+ result := make([]bool, c.positionCount)
+ for i := int32(0); i < c.positionCount; i++ {
+ result[i] = false
+ }
+ return result
+}
+
+func (c *FloatColumn) GetPositionCount() int32 {
+ return c.positionCount
+}
+
+type LongColumn struct {
+ baseColumn
+ arrayOffset int32
+ positionCount int32
+ valueIsNull []bool
+ values []int64
+}
+
+func NewLongColumn(arrayOffset int32, positionCount int32, valueIsNull []bool,
values []int64) (*LongColumn, error) {
+ if arrayOffset < 0 {
+ return nil, fmt.Errorf("arrayOffset is negative")
+ }
+ if positionCount < 0 {
+ return nil, fmt.Errorf("positionCount is negative")
+ }
+ if int32(len(values))-arrayOffset < positionCount {
+ return nil, fmt.Errorf("values length is less than
positionCount")
+ }
+ if valueIsNull != nil && int32(len(valueIsNull))-arrayOffset <
positionCount {
+ return nil, fmt.Errorf("isNull length is less than
positionCount")
+ }
+ return &LongColumn{
+ arrayOffset: arrayOffset,
+ positionCount: positionCount,
+ valueIsNull: valueIsNull,
+ values: values,
+ }, nil
+}
+
+func (c *LongColumn) GetDataType() TSDataType {
+ return INT64
+}
+
+func (c *LongColumn) GetEncoding() ColumnEncoding {
+ return INT64_ARRAY_COLUMN_ENCODING
+}
+
+func (c *LongColumn) GetLong(position int32) (int64, error) {
+ return c.values[position+c.arrayOffset], nil
+}
+
+func (c *LongColumn) GetLongs() ([]int64, error) {
+ return c.values, nil
+}
+
+func (c *LongColumn) GetObject(position int32) (interface{}, error) {
+ return c.GetLong(position)
+}
+
+func (c *LongColumn) MayHaveNull() bool {
+ return c.valueIsNull != nil
+}
+
+func (c *LongColumn) IsNull(position int32) bool {
+ return c.valueIsNull != nil && c.valueIsNull[position+c.arrayOffset]
+}
+
+func (c *LongColumn) IsNulls() []bool {
+ if c.valueIsNull != nil {
+ return c.valueIsNull
+ }
+ result := make([]bool, c.positionCount)
+ for i := int32(0); i < c.positionCount; i++ {
+ result[i] = false
+ }
+ return result
+}
+
+func (c *LongColumn) GetPositionCount() int32 {
+ return c.positionCount
+}
+
+type DoubleColumn struct {
+ baseColumn
+ arrayOffset int32
+ positionCount int32
+ valueIsNull []bool
+ values []float64
+}
+
+func NewDoubleColumn(arrayOffset int32, positionCount int32, valueIsNull
[]bool, values []float64) (*DoubleColumn, error) {
+ if arrayOffset < 0 {
+ return nil, fmt.Errorf("arrayOffset is negative")
+ }
+ if positionCount < 0 {
+ return nil, fmt.Errorf("positionCount is negative")
+ }
+ if int32(len(values))-arrayOffset < positionCount {
+ return nil, fmt.Errorf("values length is less than
positionCount")
+ }
+ if valueIsNull != nil && int32(len(valueIsNull))-arrayOffset <
positionCount {
+ return nil, fmt.Errorf("isNull length is less than
positionCount")
+ }
+ return &DoubleColumn{
+ arrayOffset: arrayOffset,
+ positionCount: positionCount,
+ valueIsNull: valueIsNull,
+ values: values,
+ }, nil
+}
+
+func (c *DoubleColumn) GetDataType() TSDataType {
+ return DOUBLE
+}
+
+func (c *DoubleColumn) GetEncoding() ColumnEncoding {
+ return INT64_ARRAY_COLUMN_ENCODING
+}
+
+func (c *DoubleColumn) GetDouble(position int32) (float64, error) {
+ return c.values[position+c.arrayOffset], nil
+}
+
+func (c *DoubleColumn) GetDoubles() ([]float64, error) {
+ return c.values, nil
+}
+
+func (c *DoubleColumn) GetObject(position int32) (interface{}, error) {
+ return c.GetDouble(position)
+}
+
+func (c *DoubleColumn) MayHaveNull() bool {
+ return c.valueIsNull != nil
+}
+
+func (c *DoubleColumn) IsNull(position int32) bool {
+ return c.valueIsNull != nil && c.valueIsNull[position+c.arrayOffset]
+}
+
+func (c *DoubleColumn) IsNulls() []bool {
+ if c.valueIsNull != nil {
+ return c.valueIsNull
+ }
+ result := make([]bool, c.positionCount)
+ for i := int32(0); i < c.positionCount; i++ {
+ result[i] = false
+ }
+ return result
+}
+
+func (c *DoubleColumn) GetPositionCount() int32 {
+ return c.positionCount
+}
+
+type BooleanColumn struct {
+ baseColumn
+ arrayOffset int32
+ positionCount int32
+ valueIsNull []bool
+ values []bool
+}
+
+func NewBooleanColumn(arrayOffset int32, positionCount int32, valueIsNull
[]bool, values []bool) (*BooleanColumn, error) {
+ if arrayOffset < 0 {
+ return nil, fmt.Errorf("arrayOffset is negative")
+ }
+ if positionCount < 0 {
+ return nil, fmt.Errorf("positionCount is negative")
+ }
+ if int32(len(values))-arrayOffset < positionCount {
+ return nil, fmt.Errorf("values length is less than
positionCount")
+ }
+ if valueIsNull != nil && int32(len(valueIsNull))-arrayOffset <
positionCount {
+ return nil, fmt.Errorf("isNull length is less than
positionCount")
+ }
+ return &BooleanColumn{
+ arrayOffset: arrayOffset,
+ positionCount: positionCount,
+ valueIsNull: valueIsNull,
+ values: values,
+ }, nil
+}
+
+func (c *BooleanColumn) GetDataType() TSDataType {
+ return BOOLEAN
+}
+
+func (c *BooleanColumn) GetEncoding() ColumnEncoding {
+ return BYTE_ARRAY_COLUMN_ENCODING
+}
+
+func (c *BooleanColumn) GetBoolean(position int32) (bool, error) {
+ return c.values[position+c.arrayOffset], nil
+}
+
+func (c *BooleanColumn) GetBooleans() ([]bool, error) {
+ return c.values, nil
+}
+
+func (c *BooleanColumn) GetObject(position int32) (interface{}, error) {
+ return c.GetBoolean(position)
+}
+
+func (c *BooleanColumn) MayHaveNull() bool {
+ return c.valueIsNull != nil
+}
+
+func (c *BooleanColumn) IsNull(position int32) bool {
+ return c.valueIsNull != nil && c.valueIsNull[position+c.arrayOffset]
+}
+
+func (c *BooleanColumn) IsNulls() []bool {
+ if c.valueIsNull != nil {
+ return c.valueIsNull
+ }
+ result := make([]bool, c.positionCount)
+ for i := int32(0); i < c.positionCount; i++ {
+ result[i] = false
+ }
+ return result
+}
+
+func (c *BooleanColumn) GetPositionCount() int32 {
+ return c.positionCount
+}
+
+type RunLengthEncodedColumn struct {
+ baseColumn
+ value Column
+ positionCount int32
+}
+
+func NewRunLengthEncodedColumn(value Column, positionCount int32)
(*RunLengthEncodedColumn, error) {
+ if value == nil {
+ return nil, fmt.Errorf("value is null")
+ }
+ if value.GetPositionCount() != 1 {
+ return nil, fmt.Errorf("expected value to contain a single
position but has %v positions", value.GetPositionCount())
+ }
+ if positionCount < 0 {
+ return nil, fmt.Errorf("positionCount is negative")
+ }
+ column := new(RunLengthEncodedColumn)
+ switch (value).(type) {
+ case *RunLengthEncodedColumn:
+ column.value = (value.(*RunLengthEncodedColumn)).GetValue()
+ default:
+ column.value = value
+ }
+ column.positionCount = positionCount
+ return column, nil
+}
+
+func (c *RunLengthEncodedColumn) GetValue() Column {
+ return c.value
+}
+
+func (c *RunLengthEncodedColumn) GetDataType() TSDataType {
+ return c.value.GetDataType()
+}
+
+func (c *RunLengthEncodedColumn) GetEncoding() ColumnEncoding {
+ return RLE_COLUMN_ENCODING
+}
+
+func (c *RunLengthEncodedColumn) GetBoolean(_ int32) (bool, error) {
+ return c.value.GetBoolean(0)
+}
+
+func (c *RunLengthEncodedColumn) GetInt(_ int32) (int32, error) {
+ return c.value.GetInt(0)
+}
+
+func (c *RunLengthEncodedColumn) GetLong(_ int32) (int64, error) {
+ return c.value.GetLong(0)
+}
+
+func (c *RunLengthEncodedColumn) GetFloat(_ int32) (float32, error) {
+ return c.value.GetFloat(0)
+}
+
+func (c *RunLengthEncodedColumn) GetDouble(_ int32) (float64, error) {
+ return c.value.GetDouble(0)
+}
+
+func (c *RunLengthEncodedColumn) GetBinary(_ int32) (*Binary, error) {
+ return c.value.GetBinary(0)
+}
+
+func (c *RunLengthEncodedColumn) GetObject(_ int32) (interface{}, error) {
+ return c.value.GetObject(0)
+}
+
+func (c *RunLengthEncodedColumn) GetBooleans() ([]bool, error) {
+ v, err := c.value.GetBoolean(0)
+ if err != nil {
+ return nil, err
+ }
+ result := make([]bool, c.positionCount)
+ for i := int32(0); i < c.positionCount; i++ {
+ result[i] = v
+ }
+ return result, err
+}
+
+func (c *RunLengthEncodedColumn) GetInts() ([]int32, error) {
+ v, err := c.value.GetInt(0)
+ if err != nil {
+ return nil, err
+ }
+ result := make([]int32, c.positionCount)
+ for i := int32(0); i < c.positionCount; i++ {
+ result[i] = v
+ }
+ return result, err
+}
+
+func (c *RunLengthEncodedColumn) GetLongs() ([]int64, error) {
+ v, err := c.value.GetLong(0)
+ if err != nil {
+ return nil, err
+ }
+ result := make([]int64, c.positionCount)
+ for i := int32(0); i < c.positionCount; i++ {
+ result[i] = v
+ }
+ return result, err
+}
+
+func (c *RunLengthEncodedColumn) GetFloats() ([]float32, error) {
+ v, err := c.value.GetFloat(0)
+ if err != nil {
+ return nil, err
+ }
+ result := make([]float32, c.positionCount)
+ for i := int32(0); i < c.positionCount; i++ {
+ result[i] = v
+ }
+ return result, err
+}
+
+func (c *RunLengthEncodedColumn) GetDoubles() ([]float64, error) {
+ v, err := c.value.GetDouble(0)
+ if err != nil {
+ return nil, err
+ }
+ result := make([]float64, c.positionCount)
+ for i := int32(0); i < c.positionCount; i++ {
+ result[i] = v
+ }
+ return result, err
+}
+
+func (c *RunLengthEncodedColumn) GetBinaries() ([]*Binary, error) {
+ v, err := c.value.GetBinary(0)
+ if err != nil {
+ return nil, err
+ }
+ result := make([]*Binary, c.positionCount)
+ for i := int32(0); i < c.positionCount; i++ {
+ result[i] = v
+ }
+ return result, err
+}
+
+func (c *RunLengthEncodedColumn) GetObjects() ([]interface{}, error) {
+ v, err := c.value.GetObject(0)
+ if err != nil {
+ return nil, err
+ }
+ result := make([]interface{}, c.positionCount)
+ for i := int32(0); i < c.positionCount; i++ {
+ result[i] = v
+ }
+ return result, err
+}
+
+func (c *RunLengthEncodedColumn) MayHaveNull() bool {
+ return c.value.MayHaveNull()
+}
+
+func (c *RunLengthEncodedColumn) IsNull(_ int32) bool {
+ return c.value.IsNull(0)
+}
+
+func (c *RunLengthEncodedColumn) IsNulls() []bool {
+ result := make([]bool, c.positionCount)
+ v := c.value.IsNull(0)
+ for i := int32(0); i < c.positionCount; i++ {
+ result[i] = v
+ }
+ return result
+}
+
+func (c *RunLengthEncodedColumn) GetPositionCount() int32 {
+ return c.positionCount
+}
diff --git a/client/column_decoder.go b/client/column_decoder.go
new file mode 100644
index 0000000..3367911
--- /dev/null
+++ b/client/column_decoder.go
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+)
+
+type ColumnDecoder interface {
+ ReadTimeColumn(reader *bytes.Reader, positionCount int32) (*TimeColumn,
error)
+ ReadColumn(reader *bytes.Reader, dataType TSDataType, positionCount
int32) (Column, error)
+}
+
+func deserializeNullIndicators(reader *bytes.Reader, positionCount int32)
([]bool, error) {
+ b, err := reader.ReadByte()
+ if err != nil {
+ return nil, err
+ }
+ mayHaveNull := b != 0
+ if !mayHaveNull {
+ return nil, nil
+ }
+ return deserializeBooleanArray(reader, positionCount)
+}
+
+func deserializeBooleanArray(reader *bytes.Reader, size int32) ([]bool, error)
{
+ packedSize := (size + 7) / 8
+ packedBytes := make([]byte, packedSize)
+
+ _, err := reader.Read(packedBytes)
+ if err != nil {
+ return nil, err
+ }
+
+ // read null bits 8 at a time
+ output := make([]bool, size)
+ currentByte := 0
+ fullGroups := int(size) & ^0b111
+ for pos := 0; pos < fullGroups; pos += 8 {
+ b := packedBytes[currentByte]
+ currentByte++
+
+ output[pos+0] = (b & 0b10000000) != 0
+ output[pos+1] = (b & 0b01000000) != 0
+ output[pos+2] = (b & 0b00100000) != 0
+ output[pos+3] = (b & 0b00010000) != 0
+ output[pos+4] = (b & 0b00001000) != 0
+ output[pos+5] = (b & 0b00000100) != 0
+ output[pos+6] = (b & 0b00000010) != 0
+ output[pos+7] = (b & 0b00000001) != 0
+ }
+
+ // read last null bits
+ if remaining := int(size) % 8; remaining > 0 {
+ b := packedBytes[len(packedBytes)-1]
+ mask := uint8(0b10000000)
+
+ for pos := fullGroups; pos < int(size); pos++ {
+ output[pos] = (b & mask) != 0
+ mask >>= 1
+ }
+ }
+
+ return output, nil
+}
+
+type baseColumnDecoder struct{}
+
+func (_ *baseColumnDecoder) ReadTimeColumn(_ *bytes.Reader, _ int32)
(*TimeColumn, error) {
+ return nil, fmt.Errorf("unsupported operation: ReadTimeColumn")
+}
+
+type Int32ArrayColumnDecoder struct {
+ baseColumnDecoder
+}
+
+func (decoder *Int32ArrayColumnDecoder) ReadColumn(reader *bytes.Reader,
dataType TSDataType, positionCount int32) (Column, error) {
+ // Serialized data layout:
+ // +---------------+-----------------+-------------+
+ // | may have null | null indicators | values |
+ // +---------------+-----------------+-------------+
+ // | byte | list[byte] | list[int32] |
+ // +---------------+-----------------+-------------+
+ nullIndicators, err := deserializeNullIndicators(reader, positionCount)
+ if err != nil {
+ return nil, err
+ }
+ switch dataType {
+ case INT32, DATE:
+ intValues := make([]int32, positionCount)
+ for i := int32(0); i < positionCount; i++ {
+ if nullIndicators != nil && nullIndicators[i] {
+ continue
+ }
+ err := binary.Read(reader, binary.BigEndian,
&intValues[i])
+ if err != nil {
+ return nil, err
+ }
+ }
+ return NewIntColumn(0, positionCount, nullIndicators, intValues)
+ case FLOAT:
+ floatValues := make([]float32, positionCount)
+ for i := int32(0); i < positionCount; i++ {
+ if nullIndicators != nil && nullIndicators[i] {
+ continue
+ }
+ err := binary.Read(reader, binary.BigEndian,
&floatValues[i])
+ if err != nil {
+ return nil, err
+ }
+ }
+ return NewFloatColumn(0, positionCount, nullIndicators,
floatValues)
+ }
+ return nil, fmt.Errorf("invalid data type: %v", dataType)
+}
+
+type Int64ArrayColumnDecoder struct {
+ baseColumnDecoder
+}
+
+func (decoder *Int64ArrayColumnDecoder) ReadTimeColumn(reader *bytes.Reader,
positionCount int32) (*TimeColumn, error) {
+ // Serialized data layout:
+ // +---------------+-----------------+-------------+
+ // | may have null | null indicators | values |
+ // +---------------+-----------------+-------------+
+ // | byte | list[byte] | list[int64] |
+ // +---------------+-----------------+-------------+
+
+ nullIndicators, err := deserializeNullIndicators(reader, positionCount)
+ if err != nil {
+ return nil, err
+ }
+ if nullIndicators != nil {
+ return nil, fmt.Errorf("time column should not contain null
values")
+ }
+ values := make([]int64, positionCount)
+ for i := int32(0); i < positionCount; i++ {
+ err = binary.Read(reader, binary.BigEndian, &values[i])
+ }
+ return NewTimeColumn(0, positionCount, values)
+}
+
+func (decoder *Int64ArrayColumnDecoder) ReadColumn(reader *bytes.Reader,
dataType TSDataType, positionCount int32) (Column, error) {
+ // Serialized data layout:
+ // +---------------+-----------------+-------------+
+ // | may have null | null indicators | values |
+ // +---------------+-----------------+-------------+
+ // | byte | list[byte] | list[int64] |
+ // +---------------+-----------------+-------------+
+ nullIndicators, err := deserializeNullIndicators(reader, positionCount)
+ if err != nil {
+ return nil, err
+ }
+ switch dataType {
+ case INT64, TIMESTAMP:
+ values := make([]int64, positionCount)
+ for i := int32(0); i < positionCount; i++ {
+ if nullIndicators != nil && nullIndicators[i] {
+ continue
+ }
+ if err = binary.Read(reader, binary.BigEndian,
&values[i]); err != nil {
+ return nil, err
+ }
+ }
+ return NewLongColumn(0, positionCount, nullIndicators, values)
+ case DOUBLE:
+ values := make([]float64, positionCount)
+ for i := int32(0); i < positionCount; i++ {
+ if nullIndicators != nil && nullIndicators[i] {
+ continue
+ }
+ if err = binary.Read(reader, binary.BigEndian,
&values[i]); err != nil {
+ return nil, err
+ }
+ }
+ return NewDoubleColumn(0, positionCount, nullIndicators, values)
+ }
+ return nil, fmt.Errorf("invalid data type: %v", dataType)
+}
+
+type ByteArrayColumnDecoder struct {
+ baseColumnDecoder
+}
+
+func (decoder *ByteArrayColumnDecoder) ReadColumn(reader *bytes.Reader,
dataType TSDataType, positionCount int32) (Column, error) {
+ // Serialized data layout:
+ // +---------------+-----------------+-------------+
+ // | may have null | null indicators | values |
+ // +---------------+-----------------+-------------+
+ // | byte | list[byte] | list[byte] |
+ // +---------------+-----------------+-------------+
+
+ if dataType != BOOLEAN {
+ return nil, fmt.Errorf("invalid data type: %v", dataType)
+ }
+ nullIndicators, err := deserializeNullIndicators(reader, positionCount)
+ if err != nil {
+ return nil, err
+ }
+ values, err := deserializeBooleanArray(reader, positionCount)
+ if err != nil {
+ return nil, err
+ }
+ return NewBooleanColumn(0, positionCount, nullIndicators, values)
+}
+
+type BinaryArrayColumnDecoder struct {
+ baseColumnDecoder
+}
+
+func (decoder *BinaryArrayColumnDecoder) ReadColumn(reader *bytes.Reader,
dataType TSDataType, positionCount int32) (Column, error) {
+ // Serialized data layout:
+ // +---------------+-----------------+-------------+
+ // | may have null | null indicators | values |
+ // +---------------+-----------------+-------------+
+ // | byte | list[byte] | list[entry] |
+ // +---------------+-----------------+-------------+
+ //
+ // Each entry is represented as:
+ // +---------------+-------+
+ // | value length | value |
+ // +---------------+-------+
+ // | int32 | bytes |
+ // +---------------+-------+
+
+ if TEXT != dataType {
+ return nil, fmt.Errorf("invalid data type: %v", dataType)
+ }
+ nullIndicators, err := deserializeNullIndicators(reader, positionCount)
+ if err != nil {
+ return nil, err
+ }
+ values := make([]*Binary, positionCount)
+ for i := int32(0); i < positionCount; i++ {
+ if nullIndicators != nil && nullIndicators[i] {
+ continue
+ }
+ var length int32
+ err := binary.Read(reader, binary.BigEndian, &length)
+ if err != nil {
+ return nil, err
+ }
+ value := make([]byte, length)
+ _, err = reader.Read(value)
+ if err != nil {
+ return nil, err
+ }
+ values[i] = NewBinary(value)
+ }
+ return NewBinaryColumn(0, positionCount, nullIndicators, values)
+}
+
+type RunLengthColumnDecoder struct {
+ baseColumnDecoder
+}
+
+func (decoder *RunLengthColumnDecoder) ReadColumn(reader *bytes.Reader,
dataType TSDataType, positionCount int32) (Column, error) {
+ // Serialized data layout:
+ // +-----------+-------------------------+
+ // | encoding | serialized inner column |
+ // +-----------+-------------------------+
+ // | byte | list[byte] |
+ // +-----------+-------------------------+
+ columnEncoding, err := deserializeColumnEncoding(reader)
+ if err != nil {
+ return nil, err
+ }
+ columnDecoder, err := getColumnDecoder(columnEncoding)
+ if err != nil {
+ return nil, err
+ }
+ column, err := columnDecoder.ReadColumn(reader, dataType, 1)
+ if err != nil {
+ return nil, err
+ }
+ return NewRunLengthEncodedColumn(column, positionCount)
+}
diff --git a/client/protocol.go b/client/protocol.go
index edc211e..faedfad 100644
--- a/client/protocol.go
+++ b/client/protocol.go
@@ -19,6 +19,8 @@
package client
+import "fmt"
+
type TSDataType int8
type TSEncoding uint8
@@ -39,6 +41,48 @@ const (
STRING TSDataType = 11
)
+var tsTypeMap = map[string]TSDataType{
+ "BOOLEAN": BOOLEAN,
+ "INT32": INT32,
+ "INT64": INT64,
+ "FLOAT": FLOAT,
+ "DOUBLE": DOUBLE,
+ "TEXT": TEXT,
+ "TIMESTAMP": TIMESTAMP,
+ "DATE": DATE,
+ "BLOB": BLOB,
+ "STRING": STRING,
+}
+
+var byteToTsDataType = map[byte]TSDataType{
+ 0: BOOLEAN,
+ 1: INT32,
+ 2: INT64,
+ 3: FLOAT,
+ 4: DOUBLE,
+ 5: TEXT,
+ 8: TIMESTAMP,
+ 9: DATE,
+ 10: BLOB,
+ 11: STRING,
+}
+
+func GetDataTypeByStr(name string) (TSDataType, error) {
+ dataType, exists := tsTypeMap[name]
+ if !exists {
+ return UNKNOWN, fmt.Errorf("invalid input: %v", name)
+ }
+ return dataType, nil
+}
+
+func getDataTypeByByte(b byte) (TSDataType, error) {
+ dataType, exists := byteToTsDataType[b]
+ if !exists {
+ return UNKNOWN, fmt.Errorf("invalid input: %v", b)
+ }
+ return dataType, nil
+}
+
const (
PLAIN TSEncoding = 0
DICTIONARY TSEncoding = 1
@@ -202,3 +246,7 @@ const (
CqAlreadyExist int32 = 1402
CqUpdateLastExecTimeError int32 = 1403
)
+
+const (
+ TimestampColumnName = "Time"
+)
diff --git a/client/rpcdataset.go b/client/rpcdataset.go
index 11ec81a..0629d21 100644
--- a/client/rpcdataset.go
+++ b/client/rpcdataset.go
@@ -1,616 +1,600 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
package client
import (
"context"
- "encoding/binary"
- "errors"
"fmt"
"github.com/apache/iotdb-client-go/common"
- "math"
- "time"
-
"github.com/apache/iotdb-client-go/rpc"
+ "strconv"
+ "time"
)
-const (
- startIndex = 2
- flag = 0x80
-)
-
-var (
- errClosed error = errors.New("DataSet is Closed")
- tsTypeMap map[string]TSDataType = map[string]TSDataType{
- "BOOLEAN": BOOLEAN,
- "INT32": INT32,
- "INT64": INT64,
- "FLOAT": FLOAT,
- "DOUBLE": DOUBLE,
- "TEXT": TEXT,
- "TIMESTAMP": TIMESTAMP,
- "DATE": DATE,
- "BLOB": BLOB,
- "STRING": STRING,
- }
-)
+const startIndex = int32(2)
type IoTDBRpcDataSet struct {
- columnCount int
- sessionId int64
- queryId int64
- lastReadWasNull bool
- rowsIndex int
- queryDataSet *rpc.TSQueryDataSet
sql string
- fetchSize int32
+ isClosed bool
+ client *rpc.IClientRPCServiceClient
columnNameList []string
- columnTypeList []TSDataType
+ columnTypeList []string
columnOrdinalMap map[string]int32
columnTypeDeduplicatedList []TSDataType
- currentBitmap []byte
- time []byte
- values [][]byte
- client *rpc.IClientRPCServiceClient
- emptyResultSet bool
- ignoreTimeStamp bool
- closed bool
- timeoutMs *int64
-}
+ fetchSize int32
+ timeout *int64
+ hasCachedRecord bool
+ lastReadWasNull bool
-func (s *IoTDBRpcDataSet) getColumnIndex(columnName string) int32 {
- if s.closed {
- return -1
- }
- return s.columnOrdinalMap[columnName] - startIndex
+ columnSize int32
+
+ sessionId int64
+ queryId int64
+ statementId int64
+ time int64
+ ignoreTimestamp bool
+ // indicates that there is still more data in server side and we can
call fetchResult to get more
+ moreData bool
+
+ queryResult [][]byte
+ curTsBlock *TsBlock
+ queryResultSize int32 // the length of queryResult
+ queryResultIndex int32 // the index of bytebuffer in queryResult
+ tsBlockSize int32 // the size of current tsBlock
+ tsBlockIndex int32 // the row index in current tsBlock
}
-func (s *IoTDBRpcDataSet) getColumnType(columnName string) TSDataType {
- if s.closed {
- return UNKNOWN
+func NewIoTDBRpcDataSet(sql string, columnNameList []string, columnTypeList
[]string, columnNameIndex map[string]int32, ignoreTimestamp bool, moreData
bool, queryId int64, statementId int64, client *rpc.IClientRPCServiceClient,
sessionId int64, queryResult [][]byte, fetchSize int32, timeout *int64)
(rpcDataSet *IoTDBRpcDataSet, err error) {
+ ds := &IoTDBRpcDataSet{
+ sessionId: sessionId,
+ statementId: statementId,
+ ignoreTimestamp: ignoreTimestamp,
+ sql: sql,
+ queryId: queryId,
+ client: client,
+ fetchSize: fetchSize,
+ timeout: timeout,
+ moreData: moreData,
+ columnSize: int32(len(columnNameList)),
+ columnNameList: make([]string, 0, len(columnNameList)+1),
+ columnTypeList: make([]string, 0, len(columnTypeList)+1),
+ columnOrdinalMap: make(map[string]int32),
+ }
+ if !ignoreTimestamp {
+ ds.columnNameList = append(ds.columnNameList,
TimestampColumnName)
+ ds.columnTypeList = append(ds.columnTypeList, "INT64")
+ ds.columnOrdinalMap[TimestampColumnName] = 1
}
- return s.columnTypeDeduplicatedList[s.getColumnIndex(columnName)]
-}
-
-func (s *IoTDBRpcDataSet) isNullWithColumnName(columnName string) bool {
- return s.isNull(int(s.getColumnIndex(columnName)), s.rowsIndex-1)
-}
+ ds.columnNameList = append(ds.columnNameList, columnNameList...)
+ ds.columnTypeList = append(ds.columnTypeList, columnTypeList...)
-func (s *IoTDBRpcDataSet) isNull(columnIndex int, rowIndex int) bool {
- if s.closed {
- return true
- }
- bitmap := s.currentBitmap[columnIndex]
- shift := rowIndex % 8
- return ((flag >> shift) & (bitmap & 0xff)) == 0
-}
+ if columnNameIndex != nil {
+ deduplicatedColumnSize :=
getDeduplicatedColumnSize(columnNameIndex)
+ ds.columnTypeDeduplicatedList = make([]TSDataType,
deduplicatedColumnSize)
+ for i, name := range columnNameList {
+ if _, exists := ds.columnOrdinalMap[name]; exists {
+ continue
+ }
-func (s *IoTDBRpcDataSet) constructOneRow() error {
- if s.closed {
- return errClosed
- }
+ index := columnNameIndex[name]
+ targetIndex := index + startIndex
- // simulating buffer, read 8 bytes from data set and discard first 8
bytes which have been read.
- s.time = s.queryDataSet.Time[:8]
- s.queryDataSet.Time = s.queryDataSet.Time[8:]
+ valueExists := false
+ for _, v := range ds.columnOrdinalMap {
+ if v == targetIndex {
+ valueExists = true
+ break
+ }
+ }
- for i := 0; i < len(s.queryDataSet.BitmapList); i++ {
- bitmapBuffer := s.queryDataSet.BitmapList[i]
- if s.rowsIndex%8 == 0 {
- s.currentBitmap[i] = bitmapBuffer[0]
- s.queryDataSet.BitmapList[i] = bitmapBuffer[1:]
+ if !valueExists {
+ if int(index) <
len(ds.columnTypeDeduplicatedList) {
+ if
ds.columnTypeDeduplicatedList[index], err =
GetDataTypeByStr(columnTypeList[i]); err != nil {
+ return nil, err
+ }
+ }
+ }
+ ds.columnOrdinalMap[name] = targetIndex
}
- if !s.isNull(i, s.rowsIndex) {
- valueBuffer := s.queryDataSet.ValueList[i]
- dataType := s.columnTypeDeduplicatedList[i]
- switch dataType {
- case BOOLEAN:
- s.values[i] = valueBuffer[:1]
- s.queryDataSet.ValueList[i] = valueBuffer[1:]
- case INT32, DATE:
- s.values[i] = valueBuffer[:4]
- s.queryDataSet.ValueList[i] = valueBuffer[4:]
- case INT64, TIMESTAMP:
- s.values[i] = valueBuffer[:8]
- s.queryDataSet.ValueList[i] = valueBuffer[8:]
- case FLOAT:
- s.values[i] = valueBuffer[:4]
- s.queryDataSet.ValueList[i] = valueBuffer[4:]
- case DOUBLE:
- s.values[i] = valueBuffer[:8]
- s.queryDataSet.ValueList[i] = valueBuffer[8:]
- case TEXT, BLOB, STRING:
- length := bytesToInt32(valueBuffer[:4])
- s.values[i] = valueBuffer[4 : 4+length]
- s.queryDataSet.ValueList[i] =
valueBuffer[4+length:]
- default:
- return fmt.Errorf("unsupported data type %d",
dataType)
+ } else {
+ ds.columnTypeDeduplicatedList = make([]TSDataType, 0)
+ index := startIndex
+ for i := 0; i < len(columnNameList); i++ {
+ name := columnNameList[i]
+ if _, exists := ds.columnOrdinalMap[name]; !exists {
+ dataType, err :=
GetDataTypeByStr(columnTypeList[i])
+ if err != nil {
+ return nil, err
+ }
+ ds.columnTypeDeduplicatedList =
append(ds.columnTypeDeduplicatedList, dataType)
+ ds.columnOrdinalMap[name] = int32(index)
+ index++
}
}
}
- s.rowsIndex++
- return nil
+ ds.queryResult = queryResult
+ if queryResult != nil {
+ ds.queryResultSize = int32(len(queryResult))
+ } else {
+ ds.queryResultSize = 0
+ }
+ ds.queryResultIndex = 0
+ ds.tsBlockSize = 0
+ ds.tsBlockIndex = -1
+ return ds, nil
}
-func (s *IoTDBRpcDataSet) GetTimestamp() int64 {
- if s.closed {
- return -1
+func getDeduplicatedColumnSize(columnNameList map[string]int32) int {
+ uniqueIndexes := make(map[int32]struct{})
+ for _, idx := range columnNameList {
+ uniqueIndexes[idx] = struct{}{}
}
- return bytesToInt64(s.time)
+ return len(uniqueIndexes)
}
-func (s *IoTDBRpcDataSet) getText(columnName string) string {
- if s.closed {
- return ""
+func (s *IoTDBRpcDataSet) Close() (err error) {
+ if s.isClosed {
+ return nil
}
- if columnName == TimestampColumnName {
- return time.Unix(0,
bytesToInt64(s.time)*1000000).Format(time.RFC3339)
+ closeRequest := &rpc.TSCloseOperationReq{
+ SessionId: s.sessionId,
+ StatementId: &s.statementId,
+ QueryId: &s.queryId,
}
- columnIndex := s.getColumnIndex(columnName)
- if columnIndex < 0 || int(columnIndex) >= len(s.values) ||
s.isNull(int(columnIndex), s.rowsIndex-1) {
- s.lastReadWasNull = true
- return ""
+ var status *common.TSStatus
+ status, err = s.client.CloseOperation(context.Background(),
closeRequest)
+ if err == nil {
+ err = VerifySuccess(status)
}
- s.lastReadWasNull = false
- return s.getString(int(columnIndex),
s.columnTypeDeduplicatedList[columnIndex])
+ s.client = nil
+ s.isClosed = true
+ return err
}
-func (s *IoTDBRpcDataSet) getString(columnIndex int, dataType TSDataType)
string {
- if s.closed {
- return ""
+func (s *IoTDBRpcDataSet) Next() (result bool, err error) {
+ if s.hasCachedBlock() {
+ s.lastReadWasNull = false
+ err = s.constructOneRow()
+ return true, err
}
- valueBytes := s.values[columnIndex]
- switch dataType {
- case BOOLEAN:
- if valueBytes[0] != 0 {
- return "true"
+ if s.hasCachedByteBuffer() {
+ if err = s.constructOneTsBlock(); err != nil {
+ return false, err
}
- return "false"
- case INT32:
- return int32ToString(bytesToInt32(valueBytes))
- case INT64, TIMESTAMP:
- return int64ToString(bytesToInt64(valueBytes))
- case FLOAT:
- bits := binary.BigEndian.Uint32(valueBytes)
- return float32ToString(math.Float32frombits(bits))
- case DOUBLE:
- bits := binary.BigEndian.Uint64(valueBytes)
- return float64ToString(math.Float64frombits(bits))
- case TEXT, STRING:
- return string(valueBytes)
- case BLOB:
- return bytesToHexString(valueBytes)
- case DATE:
- date, err := bytesToDate(valueBytes)
+ err = s.constructOneRow()
+ return true, err
+ }
+
+ if s.moreData {
+ hasResultSet, err := s.fetchResults()
if err != nil {
- return ""
+ return false, err
+ }
+ if hasResultSet && s.hasCachedByteBuffer() {
+ if err = s.constructOneTsBlock(); err != nil {
+ return false, err
+ }
+ err = s.constructOneRow()
+ return true, err
}
- return date.Format("2006-01-02")
- default:
- return ""
}
+ err = s.Close()
+ if err != nil {
+ return false, err
+ }
+ return false, nil
}
-func (s *IoTDBRpcDataSet) getValue(columnName string) interface{} {
- if s.closed {
- return nil
+func (s *IoTDBRpcDataSet) fetchResults() (bool, error) {
+ if s.isClosed {
+ return false, fmt.Errorf("this data set is already closed")
}
- columnIndex := int(s.getColumnIndex(columnName))
- if s.isNull(columnIndex, s.rowsIndex-1) {
- return nil
+ req := rpc.TSFetchResultsReq{
+ SessionId: s.sessionId,
+ Statement: s.sql,
+ FetchSize: s.fetchSize,
+ QueryId: s.queryId,
+ IsAlign: true,
}
+ req.Timeout = s.timeout
- dataType := s.getColumnType(columnName)
- valueBytes := s.values[columnIndex]
- switch dataType {
- case BOOLEAN:
- return valueBytes[0] != 0
- case INT32:
- return bytesToInt32(valueBytes)
- case INT64, TIMESTAMP:
- return bytesToInt64(valueBytes)
- case FLOAT:
- bits := binary.BigEndian.Uint32(valueBytes)
- return math.Float32frombits(bits)
- case DOUBLE:
- bits := binary.BigEndian.Uint64(valueBytes)
- return math.Float64frombits(bits)
- case TEXT, STRING:
- return string(valueBytes)
- case BLOB:
- return valueBytes
- case DATE:
- date, err := bytesToDate(valueBytes)
- if err != nil {
- return nil
- }
- return date
- default:
- return nil
+ resp, err := s.client.FetchResultsV2(context.Background(), &req)
+
+ if err != nil {
+ return false, err
}
-}
-func (s *IoTDBRpcDataSet) getRowRecord() (*RowRecord, error) {
- if s.closed {
- return nil, errClosed
+ if err = VerifySuccess(resp.Status); err != nil {
+ return false, err
}
- fields := make([]*Field, s.columnCount)
- for i := 0; i < s.columnCount; i++ {
- columnName := s.columnNameList[i]
- field := Field{
- name: columnName,
- dataType: s.getColumnType(columnName),
- value: s.getValue(columnName),
+ if !resp.HasResultSet {
+ err = s.Close()
+ } else {
+ s.queryResult = resp.GetQueryResult_()
+ s.queryResultIndex = 0
+ if s.queryResult != nil {
+ s.queryResultSize = int32(len(s.queryResult))
+ } else {
+ s.queryResultSize = 0
}
- fields[i] = &field
+ s.tsBlockSize = 0
+ s.tsBlockIndex = -1
}
- return &RowRecord{
- timestamp: s.GetTimestamp(),
- fields: fields,
- }, nil
+ return resp.HasResultSet, err
}
-func (s *IoTDBRpcDataSet) getBool(columnName string) bool {
- if s.closed {
- return false
- }
- columnIndex := s.getColumnIndex(columnName)
- if !s.isNull(int(columnIndex), s.rowsIndex-1) {
- return s.values[columnIndex][0] != 0
+func (s *IoTDBRpcDataSet) hasCachedBlock() bool {
+ return s.curTsBlock != nil && s.tsBlockIndex < s.tsBlockSize-1
+}
+
+func (s *IoTDBRpcDataSet) hasCachedByteBuffer() bool {
+ return s.queryResult != nil && s.queryResultIndex < s.queryResultSize
+}
+
+func (s *IoTDBRpcDataSet) constructOneRow() (err error) {
+ s.tsBlockIndex++
+ s.hasCachedRecord = true
+ s.time, err = s.curTsBlock.GetTimeColumn().GetLong(s.tsBlockIndex)
+ return err
+}
+
+func (s *IoTDBRpcDataSet) constructOneTsBlock() (err error) {
+ s.lastReadWasNull = false
+ curTsBlockBytes := s.queryResult[s.queryResultIndex]
+ s.queryResultIndex = s.queryResultIndex + 1
+ s.curTsBlock, err = DeserializeTsBlock(curTsBlockBytes)
+ if err != nil {
+ return err
}
- s.lastReadWasNull = true
- return false
+ s.tsBlockIndex = -1
+ s.tsBlockSize = s.curTsBlock.GetPositionCount()
+ return nil
}
-func (s *IoTDBRpcDataSet) scan(dest ...interface{}) error {
- if s.closed {
- return errClosed
+func (s *IoTDBRpcDataSet) isNullByIndex(columnIndex int32) (bool, error) {
+ columnName, err := s.findColumnNameByIndex(columnIndex)
+ if err != nil {
+ return false, err
}
+ index := s.columnOrdinalMap[columnName] - startIndex
+ // time column will never be null
+ if index < 0 {
+ return false, nil
+ }
+ return s.isNull(index, s.tsBlockIndex), nil
+}
- count := s.columnCount
- if count > len(dest) {
- count = len(dest)
+func (s *IoTDBRpcDataSet) isNullByColumnName(columnName string) bool {
+ index := s.columnOrdinalMap[columnName] - startIndex
+ // time column will never be null
+ if index < 0 {
+ return false
}
+ return s.isNull(index, s.tsBlockIndex)
+}
- for i := 0; i < count; i++ {
- columnName := s.columnNameList[i]
- columnIndex := int(s.getColumnIndex(columnName))
- if s.isNull(columnIndex, s.rowsIndex-1) {
- continue
- }
+func (s *IoTDBRpcDataSet) isNull(index int32, rowNum int32) bool {
+ return s.curTsBlock.GetColumn(index).IsNull(rowNum)
+}
- dataType := s.getColumnType(columnName)
- d := dest[i]
- valueBytes := s.values[columnIndex]
- switch dataType {
- case BOOLEAN:
- switch t := d.(type) {
- case *bool:
- *t = valueBytes[0] != 0
- case *string:
- if valueBytes[0] != 0 {
- *t = "true"
- } else {
- *t = "false"
- }
- default:
- return fmt.Errorf("dest[%d] types must be *bool
or *string", i)
- }
+func (s *IoTDBRpcDataSet) getBooleanByIndex(columnIndex int32) (bool, error) {
+ columnName, err := s.findColumnNameByIndex(columnIndex)
+ if err != nil {
+ return false, err
+ }
+ return s.getBoolean(columnName)
+}
- case INT32:
- switch t := d.(type) {
- case *int32:
- *t = bytesToInt32(valueBytes)
- case *string:
- *t = int32ToString(bytesToInt32(valueBytes))
- default:
- return fmt.Errorf("dest[%d] types must be
*int32 or *string", i)
- }
- case INT64, TIMESTAMP:
- switch t := d.(type) {
- case *int64:
- *t = bytesToInt64(valueBytes)
- case *string:
- *t = int64ToString(bytesToInt64(valueBytes))
- default:
- return fmt.Errorf("dest[%d] types must be
*int64 or *string", i)
- }
- case FLOAT:
- switch t := d.(type) {
- case *float32:
- bits := binary.BigEndian.Uint32(valueBytes)
- *t = math.Float32frombits(bits)
- case *string:
- bits := binary.BigEndian.Uint32(valueBytes)
- *t = float32ToString(math.Float32frombits(bits))
- default:
- return fmt.Errorf("dest[%d] types must be
*float32 or *string", i)
- }
- case DOUBLE:
- switch t := d.(type) {
- case *float64:
- bits := binary.BigEndian.Uint64(valueBytes)
- *t = math.Float64frombits(bits)
- case *string:
- bits := binary.BigEndian.Uint64(valueBytes)
- *t = float64ToString(math.Float64frombits(bits))
- default:
- return fmt.Errorf("dest[%d] types must be
*float64 or *string", i)
- }
- case TEXT, STRING:
- switch t := d.(type) {
- case *[]byte:
- *t = valueBytes
- case *string:
- *t = string(valueBytes)
- default:
- return fmt.Errorf("dest[%d] types must be
*[]byte or *string", i)
- }
- case BLOB:
- switch t := d.(type) {
- case *[]byte:
- *t = valueBytes
- case *string:
- *t = bytesToHexString(valueBytes)
- default:
- return fmt.Errorf("dest[%d] types must be
*[]byte or *string", i)
- }
- case DATE:
- switch t := d.(type) {
- case *time.Time:
- *t, _ = bytesToDate(valueBytes)
- case *string:
- *t = int32ToString(bytesToInt32(valueBytes))
- date, err := bytesToDate(valueBytes)
- if err != nil {
- *t = ""
- }
- *t = date.Format("2006-01-02")
- default:
- return fmt.Errorf("dest[%d] types must be
*time.Time or *string", i)
- }
- default:
- return nil
- }
+func (s *IoTDBRpcDataSet) getBoolean(columnName string) (bool, error) {
+ if err := s.checkRecord(); err != nil {
+ return false, err
}
- return nil
+ index := s.columnOrdinalMap[columnName] - startIndex
+ if !s.isNull(index, s.tsBlockIndex) {
+ s.lastReadWasNull = false
+ return s.curTsBlock.GetColumn(index).GetBoolean(s.tsBlockIndex)
+ } else {
+ s.lastReadWasNull = true
+ return false, nil
+ }
+}
+
+func (s *IoTDBRpcDataSet) getDoubleByIndex(columnIndex int32) (float64, error)
{
+ columnName, err := s.findColumnNameByIndex(columnIndex)
+ if err != nil {
+ return 0, err
+ }
+ return s.getDouble(columnName)
}
-func (s *IoTDBRpcDataSet) getFloat(columnName string) float32 {
- if s.closed {
- return 0
+func (s *IoTDBRpcDataSet) getDouble(columnName string) (float64, error) {
+ if err := s.checkRecord(); err != nil {
+ return 0, err
}
- columnIndex := s.getColumnIndex(columnName)
- if !s.isNull(int(columnIndex), s.rowsIndex-1) {
+ index := s.columnOrdinalMap[columnName] - startIndex
+ if !s.isNull(index, s.tsBlockIndex) {
s.lastReadWasNull = false
- bits := binary.BigEndian.Uint32(s.values[columnIndex])
- return math.Float32frombits(bits)
+ return s.curTsBlock.GetColumn(index).GetDouble(s.tsBlockIndex)
+ } else {
+ s.lastReadWasNull = true
+ return 0, nil
}
- s.lastReadWasNull = true
- return 0
}
-func (s *IoTDBRpcDataSet) getDouble(columnName string) float64 {
- if s.closed {
- return 0
+func (s *IoTDBRpcDataSet) getFloatByIndex(columnIndex int32) (float32, error) {
+ columnName, err := s.findColumnNameByIndex(columnIndex)
+ if err != nil {
+ return 0, err
}
- columnIndex := s.getColumnIndex(columnName)
+ return s.getFloat(columnName)
+}
- if !s.isNull(int(columnIndex), s.rowsIndex-1) {
+func (s *IoTDBRpcDataSet) getFloat(columnName string) (float32, error) {
+ if err := s.checkRecord(); err != nil {
+ return 0, err
+ }
+ index := s.columnOrdinalMap[columnName] - startIndex
+ if !s.isNull(index, s.tsBlockIndex) {
s.lastReadWasNull = false
- bits := binary.BigEndian.Uint64(s.values[columnIndex])
- return math.Float64frombits(bits)
+ return s.curTsBlock.GetColumn(index).GetFloat(s.tsBlockIndex)
+ } else {
+ s.lastReadWasNull = true
+ return 0, nil
+ }
+}
+
+func (s *IoTDBRpcDataSet) getIntByIndex(columnIndex int32) (int32, error) {
+ columnName, err := s.findColumnNameByIndex(columnIndex)
+ if err != nil {
+ return 0, err
}
- s.lastReadWasNull = true
- return 0
+ return s.getInt(columnName)
}
-func (s *IoTDBRpcDataSet) getInt32(columnName string) int32 {
- if s.closed {
- return 0
+func (s *IoTDBRpcDataSet) getInt(columnName string) (int32, error) {
+ if err := s.checkRecord(); err != nil {
+ return 0, err
}
- columnIndex := s.getColumnIndex(columnName)
- if !s.isNull(int(columnIndex), s.rowsIndex-1) {
+ index := s.columnOrdinalMap[columnName] - startIndex
+ if !s.isNull(index, s.tsBlockIndex) {
s.lastReadWasNull = false
- return bytesToInt32(s.values[columnIndex])
+ dataType := s.curTsBlock.GetColumn(index).GetDataType()
+ if dataType == INT64 {
+ if v, err :=
s.curTsBlock.GetColumn(index).GetLong(s.tsBlockIndex); err != nil {
+ return 0, err
+ } else {
+ return int32(v), nil
+ }
+ }
+ return s.curTsBlock.GetColumn(index).GetInt(s.tsBlockIndex)
+ } else {
+ s.lastReadWasNull = true
+ return 0, nil
}
+}
- s.lastReadWasNull = true
- return 0
+func (s *IoTDBRpcDataSet) getLongByIndex(columnIndex int32) (int64, error) {
+ columnName, err := s.findColumnNameByIndex(columnIndex)
+ if err != nil {
+ return 0, err
+ }
+ return s.getLong(columnName)
}
-func (s *IoTDBRpcDataSet) getInt64(columnName string) int64 {
- if s.closed {
- return 0
+func (s *IoTDBRpcDataSet) getLong(columnName string) (int64, error) {
+ if err := s.checkRecord(); err != nil {
+ return 0, err
}
if columnName == TimestampColumnName {
- return bytesToInt64(s.time)
+ s.lastReadWasNull = false
+ return s.curTsBlock.GetTimeByIndex(s.tsBlockIndex)
+ }
+ index := s.columnOrdinalMap[columnName] - startIndex
+ if !s.isNull(index, s.tsBlockIndex) {
+ s.lastReadWasNull = false
+ return s.curTsBlock.GetColumn(index).GetLong(s.tsBlockIndex)
+ } else {
+ s.lastReadWasNull = true
+ return 0, nil
}
+}
- columnIndex := s.getColumnIndex(columnName)
- bys := s.values[columnIndex]
+func (s *IoTDBRpcDataSet) getBinaryByIndex(columnIndex int32) (*Binary, error)
{
+ columnName, err := s.findColumnNameByIndex(columnIndex)
+ if err != nil {
+ return nil, err
+ }
+ return s.getBinary(columnName)
+}
- if !s.isNull(int(columnIndex), s.rowsIndex-1) {
+func (s *IoTDBRpcDataSet) getBinary(columnName string) (*Binary, error) {
+ if err := s.checkRecord(); err != nil {
+ return nil, err
+ }
+ index := s.columnOrdinalMap[columnName] - startIndex
+ if !s.isNull(index, s.tsBlockIndex) {
s.lastReadWasNull = false
- return bytesToInt64(bys)
+ return s.curTsBlock.GetColumn(index).GetBinary(s.tsBlockIndex)
+ } else {
+ s.lastReadWasNull = true
+ return nil, nil
}
- s.lastReadWasNull = true
- return 0
}
-func (s *IoTDBRpcDataSet) hasCachedResults() bool {
- if s.closed {
- return false
+func (s *IoTDBRpcDataSet) getObjectByIndex(columnIndex int32) (interface{},
error) {
+ columnName, err := s.findColumnNameByIndex(columnIndex)
+ if err != nil {
+ return nil, err
}
- return s.queryDataSet != nil && len(s.queryDataSet.Time) > 0
+ return s.getObject(columnName)
}
-func (s *IoTDBRpcDataSet) next() (bool, error) {
- if s.closed {
- return false, errClosed
+func (s *IoTDBRpcDataSet) getObject(columnName string) (interface{}, error) {
+ if err := s.checkRecord(); err != nil {
+ return nil, err
}
-
- if s.hasCachedResults() {
- s.constructOneRow()
- return true, nil
+ if columnName == TimestampColumnName {
+ s.lastReadWasNull = false
+ if value, err := s.curTsBlock.GetTimeByIndex(s.tsBlockIndex);
err != nil {
+ return nil, err
+ } else {
+ return time.Unix(value/1e3, (value%1e3)*1e6), nil
+ }
}
- if s.emptyResultSet {
- return false, nil
+ index := s.columnOrdinalMap[columnName] - startIndex
+ if index < 0 || index >= int32(len(s.columnTypeDeduplicatedList)) ||
s.isNull(index, s.tsBlockIndex) {
+ s.lastReadWasNull = true
+ return nil, nil
}
+ s.lastReadWasNull = false
+ return s.curTsBlock.GetColumn(index).GetObject(s.tsBlockIndex)
+}
- r, err := s.fetchResults()
- if err == nil && r {
- s.constructOneRow()
- return true, nil
+func (s *IoTDBRpcDataSet) getStringByIndex(columnIndex int32) (string, error) {
+ columnName, err := s.findColumnNameByIndex(columnIndex)
+ if err != nil {
+ return "", err
}
- return false, nil
+ return s.getValueByName(columnName)
}
-func (s *IoTDBRpcDataSet) fetchResults() (bool, error) {
- if s.closed {
- return false, errClosed
- }
- s.rowsIndex = 0
- req := rpc.TSFetchResultsReq{
- SessionId: s.sessionId,
- Statement: s.sql,
- FetchSize: s.fetchSize,
- QueryId: s.queryId,
- IsAlign: true,
- Timeout: s.timeoutMs,
+func (s *IoTDBRpcDataSet) getTimestampByIndex(columnIndex int32) (time.Time,
error) {
+ columnName, err := s.findColumnNameByIndex(columnIndex)
+ if err != nil {
+ return time.Time{}, err
}
- resp, err := s.client.FetchResults(context.Background(), &req)
+ return s.getTimestamp(columnName)
+}
+func (s *IoTDBRpcDataSet) getTimestamp(columnName string) (time.Time, error) {
+ longValue, err := s.getLong(columnName)
if err != nil {
- return false, err
+ return time.Time{}, err
+ }
+ if s.lastReadWasNull {
+ return time.Time{}, err
+ } else {
+ return time.Unix(longValue/1e3, (longValue%1e3)*1e6), nil
}
+}
- if err = VerifySuccess(resp.Status); err != nil {
- return false, err
+func (s *IoTDBRpcDataSet) GetDateByIndex(columnIndex int32) (time.Time, error)
{
+ columnName, err := s.findColumnNameByIndex(columnIndex)
+ if err != nil {
+ return time.Time{}, err
}
+ return s.GetDate(columnName)
+}
- if !resp.HasResultSet {
- s.emptyResultSet = true
+func (s *IoTDBRpcDataSet) GetDate(columnName string) (time.Time, error) {
+ intValue, err := s.getInt(columnName)
+ if err != nil {
+ return time.Time{}, err
+ }
+ if s.lastReadWasNull {
+ return time.Time{}, err
} else {
- s.queryDataSet = resp.GetQueryDataSet()
+ return Int32ToDate(intValue)
}
- return resp.HasResultSet, nil
}
-func (s *IoTDBRpcDataSet) IsClosed() bool {
- return s.closed
+func (s *IoTDBRpcDataSet) findColumn(columnName string) int32 {
+ return s.columnOrdinalMap[columnName]
}
-func (s *IoTDBRpcDataSet) Close() (err error) {
- if s.IsClosed() {
- return nil
+func (s *IoTDBRpcDataSet) getValueByName(columnName string) (string, error) {
+ err := s.checkRecord()
+ if err != nil {
+ return "", err
}
- if s.client != nil {
- closeRequest := &rpc.TSCloseOperationReq{
- SessionId: s.sessionId,
- QueryId: &s.queryId,
+ if columnName == TimestampColumnName {
+ s.lastReadWasNull = false
+ if t, err := s.curTsBlock.GetTimeByIndex(s.tsBlockIndex); err
!= nil {
+ return "", err
+ } else {
+ return int64ToString(t), nil
}
+ }
+ index := s.columnOrdinalMap[columnName] - startIndex
+ if index < 0 || index >= int32(len(s.columnTypeDeduplicatedList)) ||
s.isNull(index, s.tsBlockIndex) {
+ s.lastReadWasNull = true
+ return "", err
+ }
+ s.lastReadWasNull = false
+ return s.getString(index, s.columnTypeDeduplicatedList[index])
+}
- var status *common.TSStatus
- status, err = s.client.CloseOperation(context.Background(),
closeRequest)
- if err == nil {
- err = VerifySuccess(status)
+func (s *IoTDBRpcDataSet) getString(index int32, tsDataType TSDataType)
(string, error) {
+ switch tsDataType {
+ case BOOLEAN:
+ if v, err :=
s.curTsBlock.GetColumn(index).GetBoolean(s.tsBlockIndex); err != nil {
+ return "", nil
+ } else {
+ return strconv.FormatBool(v), nil
+ }
+ case INT32:
+ if v, err :=
s.curTsBlock.GetColumn(index).GetInt(s.tsBlockIndex); err != nil {
+ return "", err
+ } else {
+ return int32ToString(v), nil
+ }
+ case INT64, TIMESTAMP:
+ if v, err :=
s.curTsBlock.GetColumn(index).GetLong(s.tsBlockIndex); err != nil {
+ return "", err
+ } else {
+ return int64ToString(v), nil
+ }
+ case FLOAT:
+ if v, err :=
s.curTsBlock.GetColumn(index).GetFloat(s.tsBlockIndex); err != nil {
+ return "", err
+ } else {
+ return float32ToString(v), nil
+ }
+ case DOUBLE:
+ if v, err :=
s.curTsBlock.GetColumn(index).GetDouble(s.tsBlockIndex); err != nil {
+ return "", err
+ } else {
+ return float64ToString(v), nil
}
+ case TEXT, STRING:
+ if v, err :=
s.curTsBlock.GetColumn(index).GetBinary(s.tsBlockIndex); err != nil {
+ return "", err
+ } else {
+ return v.GetStringValue(), nil
+ }
+ case BLOB:
+ if v, err :=
s.curTsBlock.GetColumn(index).GetBinary(s.tsBlockIndex); err != nil {
+ return "", err
+ } else {
+ return bytesToHexString(v.values), nil
+ }
+ case DATE:
+ v, err := s.curTsBlock.GetColumn(index).GetInt(s.tsBlockIndex)
+ if err != nil {
+ return "", err
+ }
+ t, err := Int32ToDate(v)
+ if err != nil {
+ return "", err
+ }
+ return t.Format("2006-01-02"), nil
}
-
- s.columnCount = 0
- s.sessionId = -1
- s.queryId = -1
- s.rowsIndex = -1
- s.queryDataSet = nil
- s.sql = ""
- s.fetchSize = 0
- s.columnNameList = nil
- s.columnTypeList = nil
- s.columnOrdinalMap = nil
- s.columnTypeDeduplicatedList = nil
- s.currentBitmap = nil
- s.time = nil
- s.values = nil
- s.client = nil
- s.emptyResultSet = true
- s.closed = true
- return err
+ return "", nil
}
-func NewIoTDBRpcDataSet(sql string, columnNameList []string, columnTypes
[]string,
- columnNameIndex map[string]int32,
- queryId int64, client *rpc.IClientRPCServiceClient, sessionId int64,
queryDataSet *rpc.TSQueryDataSet,
- ignoreTimeStamp bool, fetchSize int32, timeoutMs *int64)
*IoTDBRpcDataSet {
-
- ds := &IoTDBRpcDataSet{
- sql: sql,
- columnNameList: columnNameList,
- ignoreTimeStamp: ignoreTimeStamp,
- queryId: queryId,
- client: client,
- sessionId: sessionId,
- queryDataSet: queryDataSet,
- fetchSize: fetchSize,
- currentBitmap: make([]byte, len(columnNameList)),
- values: make([][]byte, len(columnTypes)),
- columnCount: len(columnNameList),
- closed: false,
- timeoutMs: timeoutMs,
- }
-
- ds.columnTypeList = make([]TSDataType, 0)
-
- // deduplicate and map
- ds.columnOrdinalMap = make(map[string]int32)
- if !ignoreTimeStamp {
- ds.columnOrdinalMap[TimestampColumnName] = 1
+func (s *IoTDBRpcDataSet) findColumnNameByIndex(columnIndex int32) (string,
error) {
+ if columnIndex <= 0 {
+ return "", fmt.Errorf("column index should start from 1")
+ }
+ if columnIndex > int32(len(s.columnNameList)) {
+ return "", fmt.Errorf("column index %d out of range %d",
columnIndex, len(s.columnNameList))
}
+ return s.columnNameList[columnIndex-1], nil
+}
- if columnNameIndex != nil {
- ds.columnTypeDeduplicatedList = make([]TSDataType,
len(columnNameIndex))
- for i, name := range columnNameList {
- columnTypeString := columnTypes[i]
- columnDataType := tsTypeMap[columnTypeString]
- ds.columnTypeList = append(ds.columnTypeList,
columnDataType)
- if _, exists := ds.columnOrdinalMap[name]; !exists {
- index := columnNameIndex[name]
- ds.columnOrdinalMap[name] = index + startIndex
- ds.columnTypeDeduplicatedList[index] =
tsTypeMap[columnTypeString]
- }
- }
- } else {
- ds.columnTypeDeduplicatedList = make([]TSDataType,
ds.columnCount)
- index := startIndex
- for i := 0; i < len(columnNameList); i++ {
- name := columnNameList[i]
- dataType := tsTypeMap[columnTypes[i]]
- ds.columnTypeList = append(ds.columnTypeList, dataType)
- ds.columnTypeDeduplicatedList[i] = dataType
- if _, exists := ds.columnOrdinalMap[name]; !exists {
- ds.columnOrdinalMap[name] = int32(index)
- index++
- }
- }
+func (s *IoTDBRpcDataSet) checkRecord() (err error) {
+ if s.queryResultIndex > s.queryResultSize || s.tsBlockIndex >=
s.tsBlockSize || s.queryResult == nil || s.curTsBlock == nil {
+ err = fmt.Errorf("no record remains")
}
- return ds
+ return err
}
diff --git a/client/rpcdataset_test.go b/client/rpcdataset_test.go
deleted file mode 100644
index 82dd3b0..0000000
--- a/client/rpcdataset_test.go
+++ /dev/null
@@ -1,666 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package client
-
-import (
- "bytes"
- "reflect"
- "testing"
- "time"
-
- "github.com/apache/iotdb-client-go/rpc"
-)
-
-func createIoTDBRpcDataSet() *IoTDBRpcDataSet {
- columns := []string{
- "root.ln.device1.restart_count",
- "root.ln.device1.price",
- "root.ln.device1.tick_count",
- "root.ln.device1.temperature",
- "root.ln.device1.description",
- "root.ln.device1.status",
- "root.ln.device1.description_string",
- "root.ln.device1.description_blob",
- "root.ln.device1.date",
- "root.ln.device1.ts",
- }
- dataTypes := []string{"INT32", "DOUBLE", "INT64", "FLOAT", "TEXT",
"BOOLEAN", "STRING", "BLOB", "DATE", "TIMESTAMP"}
- columnNameIndex := map[string]int32{
- "root.ln.device1.restart_count": 2,
- "root.ln.device1.price": 1,
- "root.ln.device1.tick_count": 5,
- "root.ln.device1.temperature": 4,
- "root.ln.device1.description": 0,
- "root.ln.device1.status": 3,
- "root.ln.device1.description_string": 6,
- "root.ln.device1.description_blob": 7,
- "root.ln.device1.date": 8,
- "root.ln.device1.ts": 9,
- }
- var queyrId int64 = 1
- var sessionId int64 = 1
- var client *rpc.IClientRPCServiceClient = nil
- queryDataSet := rpc.TSQueryDataSet{
- Time: []byte{0, 0, 1, 118, 76, 52, 0, 236, 0, 0, 1, 118, 76,
52, 25, 228, 0, 0, 1, 118, 76, 52, 41, 42, 0, 0, 1, 118, 76, 52, 243, 148, 0,
0, 1, 118, 76, 95, 98, 255},
- ValueList: [][]byte{
- {0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105,
99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99,
101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101,
32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49,
0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49},
- {64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16,
204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205, 64, 159, 16,
204, 204, 204, 204, 205, 64, 159, 16, 204, 204, 204, 204, 205},
- {0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0, 0, 1, 0, 0,
0, 1},
- {1, 1, 1, 1, 1},
- {65, 65, 153, 154, 65, 65, 153, 154, 65, 65, 153, 154,
65, 65, 153, 154, 65, 65, 153, 154},
- {0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220,
213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0,
50, 220, 213},
- {0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105,
99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99,
101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101,
32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49,
0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49},
- {0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105,
99, 101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99,
101, 32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101,
32, 49, 0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49,
0, 0, 0, 13, 84, 101, 115, 116, 32, 68, 101, 118, 105, 99, 101, 32, 49},
- {1, 52, 216, 17, 1, 52, 216, 17, 1, 52, 216, 17, 1, 52,
216, 17, 1, 52, 216, 17},
- {0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220,
213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0, 50, 220, 213, 0, 0, 0, 0, 0,
50, 220, 213},
- },
- BitmapList: [][]byte{{248}, {248}, {248}, {248}, {248}, {248},
{248}, {248}, {248}, {248}},
- }
- return NewIoTDBRpcDataSet("select * from root.ln.device1", columns,
dataTypes, columnNameIndex, queyrId, client, sessionId, &queryDataSet, false,
DefaultFetchSize, nil)
-}
-
-func TestIoTDBRpcDataSet_getColumnType(t *testing.T) {
- type args struct {
- columnName string
- }
-
- ds := createIoTDBRpcDataSet()
- closedDataSet := createIoTDBRpcDataSet()
- closedDataSet.Close()
- tests := []struct {
- name string
- dataSet *IoTDBRpcDataSet
- args args
- want TSDataType
- }{
- {
- name: "Normal",
- dataSet: ds,
- args: args{
- columnName: "root.ln.device1.tick_count",
- },
- want: INT64,
- }, {
- name: "Closed",
- dataSet: closedDataSet,
- args: args{
- columnName: "root.ln.device1.tick_count",
- },
- want: UNKNOWN,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- s := tt.dataSet
- if got := s.getColumnType(tt.args.columnName); got !=
tt.want {
- t.Errorf("IoTDBRpcDataSet.getColumnType() = %v,
want %v", got, tt.want)
- }
- s.Close()
- })
- }
-}
-
-func TestIoTDBRpcDataSet_getColumnIndex(t *testing.T) {
- type args struct {
- columnName string
- }
- closedDataSet := createIoTDBRpcDataSet()
- closedDataSet.Close()
- tests := []struct {
- name string
- dataset *IoTDBRpcDataSet
- args args
- want int32
- }{
- {
- name: "Normal",
- dataset: createIoTDBRpcDataSet(),
- args: args{
- columnName: "root.ln.device1.tick_count",
- },
- want: 5,
- }, {
- name: "Closed",
- dataset: closedDataSet,
- args: args{
- columnName: "root.ln.device1.tick_count",
- },
- want: -1,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- s := tt.dataset
- if got := s.getColumnIndex(tt.args.columnName); got !=
tt.want {
- t.Errorf("IoTDBRpcDataSet.getColumnIndex() =
%v, want %v", got, tt.want)
- }
- })
- }
-}
-
-func TestIoTDBRpcDataSet_isNull(t *testing.T) {
- type args struct {
- columnIndex int
- rowIndex int
- }
- ds := createIoTDBRpcDataSet()
- ds.next()
-
- tests := []struct {
- name string
- args args
- want bool
- }{
- {
- name: "Normal",
- args: args{
- columnIndex: 0,
- rowIndex: 0,
- },
- want: false,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- s := createIoTDBRpcDataSet()
- s.next()
- if got := s.isNull(tt.args.columnIndex,
tt.args.rowIndex); got != tt.want {
- t.Errorf("IoTDBRpcDataSet.isNull() = %v, want
%v", got, tt.want)
- }
- })
- }
-}
-
-func TestIoTDBRpcDataSet_getValue(t *testing.T) {
-
- type args struct {
- columnName string
- }
- tests := []struct {
- name string
- args args
- want interface{}
- }{
- {
- name: "restart_count",
- args: args{
- columnName: "root.ln.device1.restart_count",
- },
- want: int32(1),
- }, {
- name: "tick_count",
- args: args{
- columnName: "root.ln.device1.tick_count",
- },
- want: int64(3333333),
- }, {
- name: "price",
- args: args{
- columnName: "root.ln.device1.price",
- },
- want: float64(1988.2),
- }, {
- name: "temperature",
- args: args{
- columnName: "root.ln.device1.temperature",
- },
- want: float32(12.1),
- }, {
- name: "description",
- args: args{
- columnName: "root.ln.device1.description",
- },
- want: "Test Device 1",
- }, {
- name: "status",
- args: args{
- columnName: "root.ln.device1.status",
- },
- want: true,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- s := createIoTDBRpcDataSet()
- s.next()
- if got := s.getValue(tt.args.columnName);
!reflect.DeepEqual(got, tt.want) {
- t.Errorf("IoTDBRpcDataSet.getValue() = %v, want
%v", got, tt.want)
- }
- })
- }
-}
-
-func TestIoTDBRpcDataSet_scan(t *testing.T) {
- type args struct {
- dest []interface{}
- }
-
- type want struct {
- err error
- values []interface{}
- }
-
- var restartCount int32
- var price float64
- var tickCount int64
- var temperature float32
- var description string
- var status bool
-
- var restartCountStr string
- var priceStr string
- var tickCountStr string
- var temperatureStr string
- var descriptionStr string
- var statusStr string
-
- var wantRestartCount int32 = 1
- var wantPrice float64 = 1988.2
- var wantTickCount int64 = 3333333
- var wantTemperature float32 = 12.1
- var wantDescription string = "Test Device 1"
- var wantStatus bool = true
-
- var wantRestartCountStr string = "1"
- var wantPriceStr string = "1988.2"
- var wantTickCountStr string = "3333333"
- var wantTemperatureStr string = "12.1"
- var wantDescriptionStr string = "Test Device 1"
- var wantStatusStr string = "true"
-
- tests := []struct {
- name string
- args args
- want want
- }{
- {
- name: "Normal",
- args: args{
- dest: []interface{}{&restartCount, &price,
&tickCount, &temperature, &description, &status},
- },
- want: want{
- err: nil,
- values: []interface{}{&wantRestartCount,
&wantPrice, &wantTickCount, &wantTemperature, &wantDescription, &wantStatus},
- },
- }, {
- name: "String",
- args: args{
- dest: []interface{}{&restartCountStr,
&priceStr, &tickCountStr, &temperatureStr, &descriptionStr, &statusStr},
- },
- want: want{
- err: nil,
- values: []interface{}{&wantRestartCountStr,
&wantPriceStr, &wantTickCountStr, &wantTemperatureStr, &wantDescriptionStr,
&wantStatusStr},
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- s := createIoTDBRpcDataSet()
- s.next()
- if err := s.scan(tt.args.dest...); err != tt.want.err {
- t.Errorf("IoTDBRpcDataSet.scan() error = %v,
wantErr %v", err, tt.want.err)
- }
- if got := tt.args.dest; !reflect.DeepEqual(got,
tt.want.values) {
- t.Errorf("IoTDBRpcDataSet.scan(), dest = %v,
want %v", got, tt.want)
- }
- })
- }
-}
-
-func TestIoTDBRpcDataSet_GetTimestamp(t *testing.T) {
- tests := []struct {
- name string
- want int64
- }{
- {
- name: "GetTimestamp",
- want: 1607596245228,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- s := createIoTDBRpcDataSet()
- s.next()
- if got := s.GetTimestamp(); got != tt.want {
- t.Errorf("IoTDBRpcDataSet.GetTimestamp() = %v,
want %v", got, tt.want)
- }
- })
- }
-}
-
-func TestIoTDBRpcDataSet_getText(t *testing.T) {
- type args struct {
- columnName string
- }
- tests := []struct {
- name string
- args args
- want string
- }{
- {
- name: "restart_count",
- args: args{
- columnName: "root.ln.device1.restart_count",
- },
- want: "1",
- }, {
- name: "price",
- args: args{
- columnName: "root.ln.device1.price",
- },
- want: "1988.2",
- }, {
- name: "tick_count",
- args: args{
- columnName: "root.ln.device1.tick_count",
- },
- want: "3333333",
- }, {
- name: "temperature",
- args: args{
- columnName: "root.ln.device1.temperature",
- },
- want: "12.1",
- }, {
- name: "description",
- args: args{
- columnName: "root.ln.device1.description",
- },
- want: "Test Device 1",
- }, {
- name: "status",
- args: args{
- columnName: "root.ln.device1.status",
- },
- want: "true",
- }, {
- name: TimestampColumnName,
- args: args{
- columnName: TimestampColumnName,
- },
- want: time.Unix(0,
1607596245228000000).Format(time.RFC3339),
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- s := createIoTDBRpcDataSet()
- s.next()
- if got := s.getText(tt.args.columnName); got != tt.want
{
- t.Errorf("IoTDBRpcDataSet.getText() = %v, want
%v", got, tt.want)
- }
- })
- }
-}
-
-func TestIoTDBRpcDataSet_getBool(t *testing.T) {
- type args struct {
- columnName string
- }
- tests := []struct {
- name string
- args args
- want bool
- }{
- {
- name: "status",
- args: args{
- columnName: "root.ln.device1.status",
- },
- want: true,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- s := createIoTDBRpcDataSet()
- s.next()
- if got := s.getBool(tt.args.columnName); got != tt.want
{
- t.Errorf("IoTDBRpcDataSet.getBool() = %v, want
%v", got, tt.want)
- }
- })
- }
-}
-
-func TestIoTDBRpcDataSet_getFloat(t *testing.T) {
- type args struct {
- columnName string
- }
- tests := []struct {
- name string
- args args
- want float32
- }{
- {
- name: "temperature",
- args: args{
- columnName: "root.ln.device1.temperature",
- },
- want: 12.1,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- s := createIoTDBRpcDataSet()
- s.next()
- if got := s.getFloat(tt.args.columnName); got !=
tt.want {
- t.Errorf("IoTDBRpcDataSet.getFloat() = %v, want
%v", got, tt.want)
- }
- })
- }
-}
-
-func TestIoTDBRpcDataSet_getDouble(t *testing.T) {
- type args struct {
- columnName string
- }
- tests := []struct {
- name string
- args args
- want float64
- }{
- {
- name: "price",
- args: args{
- columnName: "root.ln.device1.price",
- },
- want: 1988.2,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- s := createIoTDBRpcDataSet()
- s.next()
- if got := s.getDouble(tt.args.columnName); got !=
tt.want {
- t.Errorf("IoTDBRpcDataSet.getDouble() = %v,
want %v", got, tt.want)
- }
- })
- }
-}
-
-func TestIoTDBRpcDataSet_getInt32(t *testing.T) {
- type args struct {
- columnName string
- }
- tests := []struct {
- name string
- args args
- want int32
- }{
- {
- name: "restart_count",
- args: args{
- columnName: "root.ln.device1.restart_count",
- },
- want: 1,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- s := createIoTDBRpcDataSet()
- s.next()
- if got := s.getInt32(tt.args.columnName); got !=
tt.want {
- t.Errorf("IoTDBRpcDataSet.getInt32() = %v, want
%v", got, tt.want)
- }
- })
- }
-}
-
-func TestIoTDBRpcDataSet_getInt64(t *testing.T) {
- type args struct {
- columnName string
- }
- tests := []struct {
- name string
- args args
- want int64
- }{
- {
- name: "tick_count",
- args: args{
- columnName: "root.ln.device1.tick_count",
- },
- want: 3333333,
- }, {
- name: TimestampColumnName,
- args: args{
- columnName: TimestampColumnName,
- },
- want: 1607596245228,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- s := createIoTDBRpcDataSet()
- s.next()
- if got := s.getInt64(tt.args.columnName); got !=
tt.want {
- t.Errorf("IoTDBRpcDataSet.getInt64() = %v, want
%v", got, tt.want)
- }
- })
- }
-}
-
-func TestIoTDBRpcDataSet_getRowRecord(t *testing.T) {
- tests := []struct {
- name string
- want *RowRecord
- wantErr bool
- }{
- {
- name: "",
- want: &RowRecord{
- timestamp: 0,
- fields: []*Field{
- {
- name:
"root.ln.device1.restart_count",
- dataType: INT32,
- value: int32(1),
- }, {
- name:
"root.ln.device1.price",
- dataType: DOUBLE,
- value: float64(1988.2),
- }, {
- name:
"root.ln.device1.tick_count",
- dataType: INT64,
- value: int64(3333333),
- }, {
- name:
"root.ln.device1.temperature",
- dataType: FLOAT,
- value: float32(12.1),
- }, {
- name:
"root.ln.device1.description",
- dataType: TEXT,
- value: "Test Device 1",
- }, {
- name:
"root.ln.device1.status",
- dataType: BOOLEAN,
- value: true,
- }, {
- name:
"root.ln.device1.description_string",
- dataType: STRING,
- value: "Test Device 1",
- }, {
- name:
"root.ln.device1.description_blob",
- dataType: BLOB,
- value: []byte("Test Device
1"),
- }, {
- name:
"root.ln.device1.date",
- dataType: DATE,
- value: time.Date(2024,
time.April, 1, 0, 0, 0, 0, time.UTC),
- }, {
- name: "root.ln.device1.ts",
- dataType: TIMESTAMP,
- value: int64(3333333),
- },
- },
- },
- wantErr: false,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- s := createIoTDBRpcDataSet()
- s.next()
- got, err := s.getRowRecord()
- if (err != nil) != tt.wantErr {
- t.Errorf("IoTDBRpcDataSet.getRowRecord() error
= %v, wantErr %v", err, tt.wantErr)
- return
- }
-
- match := true
- for i := 0; i < len(got.fields); i++ {
- gotField := got.fields[i]
- wantField := tt.want.fields[i]
- if gotField.dataType != BLOB {
- if gotField.dataType !=
wantField.dataType || gotField.name != wantField.name || gotField.value !=
wantField.value {
- match = false
- }
- } else {
- if gotField.dataType !=
wantField.dataType || gotField.name != wantField.name ||
!bytes.Equal(gotField.value.([]byte), wantField.value.([]byte)) {
- match = false
- }
- }
- }
- if !match {
- t.Errorf("IoTDBRpcDataSet.getRowRecord() = %v,
want %v", got, tt.want)
- }
- })
- }
-}
-
-func TestIoTDBRpcDataSet_Close(t *testing.T) {
-
- tests := []struct {
- name string
- wantErr bool
- }{
- {
- name: "",
- wantErr: false,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- s := createIoTDBRpcDataSet()
- s.next()
- if err := s.Close(); (err != nil) != tt.wantErr {
- t.Errorf("IoTDBRpcDataSet.Close() error = %v,
wantErr %v", err, tt.wantErr)
- }
- })
- }
-}
diff --git a/client/session.go b/client/session.go
index ef65721..07023a0 100644
--- a/client/session.go
+++ b/client/session.go
@@ -423,13 +423,13 @@ func (s *Session) ExecuteStatementWithContext(ctx
context.Context, sql string) (
StatementId: s.requestStatementId,
FetchSize: &s.config.FetchSize,
}
- resp, err := s.client.ExecuteStatement(ctx, &request)
+ resp, err := s.client.ExecuteStatementV2(ctx, &request)
if err != nil && resp == nil {
if s.reconnect() {
request.SessionId = s.sessionId
request.StatementId = s.requestStatementId
- resp, err = s.client.ExecuteStatement(ctx, &request)
+ resp, err = s.client.ExecuteStatementV2(ctx, &request)
}
}
@@ -437,7 +437,7 @@ func (s *Session) ExecuteStatementWithContext(ctx
context.Context, sql string) (
return nil, statusErr
}
- return s.genDataSet(sql, resp), err
+ return s.genDataSet(sql, resp)
}
func (s *Session) ExecuteStatement(sql string) (*SessionDataSet, error) {
@@ -451,13 +451,13 @@ func (s *Session) ExecuteNonQueryStatement(sql string) (r
*common.TSStatus, err
StatementId: s.requestStatementId,
FetchSize: &s.config.FetchSize,
}
- resp, err := s.client.ExecuteStatement(context.Background(), &request)
+ resp, err := s.client.ExecuteStatementV2(context.Background(), &request)
if err != nil && resp == nil {
if s.reconnect() {
request.SessionId = s.sessionId
request.StatementId = s.requestStatementId
- resp, err =
s.client.ExecuteStatement(context.Background(), &request)
+ resp, err =
s.client.ExecuteStatementV2(context.Background(), &request)
}
}
@@ -467,9 +467,9 @@ func (s *Session) ExecuteNonQueryStatement(sql string) (r
*common.TSStatus, err
func (s *Session) ExecuteQueryStatement(sql string, timeoutMs *int64)
(*SessionDataSet, error) {
request := rpc.TSExecuteStatementReq{SessionId: s.sessionId, Statement:
sql, StatementId: s.requestStatementId,
FetchSize: &s.config.FetchSize, Timeout: timeoutMs}
- if resp, err := s.client.ExecuteQueryStatement(context.Background(),
&request); err == nil {
+ if resp, err := s.client.ExecuteQueryStatementV2(context.Background(),
&request); err == nil {
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
- return NewSessionDataSet(sql, resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client,
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil &&
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+ return NewSessionDataSet(sql, resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId,
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_,
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs,
*resp.MoreData, s.config.FetchSize)
} else {
return nil, statusErr
}
@@ -477,9 +477,9 @@ func (s *Session) ExecuteQueryStatement(sql string,
timeoutMs *int64) (*SessionD
if s.reconnect() {
request.SessionId = s.sessionId
request.StatementId = s.requestStatementId
- resp, err =
s.client.ExecuteQueryStatement(context.Background(), &request)
+ resp, err =
s.client.ExecuteQueryStatementV2(context.Background(), &request)
if statusErr := VerifySuccess(resp.Status); statusErr
== nil {
- return NewSessionDataSet(sql, resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client,
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil &&
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+ return NewSessionDataSet(sql, resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId,
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_,
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs,
*resp.MoreData, s.config.FetchSize)
} else {
return nil, statusErr
}
@@ -494,18 +494,18 @@ func (s *Session) ExecuteAggregationQuery(paths []string,
aggregations []common.
request := rpc.TSAggregationQueryReq{SessionId: s.sessionId,
StatementId: s.requestStatementId, Paths: paths,
Aggregations: aggregations, StartTime: startTime, EndTime:
endTime, Interval: interval, FetchSize: &s.config.FetchSize, Timeout: timeoutMs}
- if resp, err := s.client.ExecuteAggregationQuery(context.Background(),
&request); err == nil {
+ if resp, err :=
s.client.ExecuteAggregationQueryV2(context.Background(), &request); err == nil {
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
- return NewSessionDataSet("", resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client,
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil &&
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+ return NewSessionDataSet("", resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId,
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_,
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs,
*resp.MoreData, s.config.FetchSize)
} else {
return nil, statusErr
}
} else {
if s.reconnect() {
request.SessionId = s.sessionId
- resp, err =
s.client.ExecuteAggregationQuery(context.Background(), &request)
+ resp, err =
s.client.ExecuteAggregationQueryV2(context.Background(), &request)
if statusErr := VerifySuccess(resp.Status); statusErr
== nil {
- return NewSessionDataSet("", resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client,
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil &&
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+ return NewSessionDataSet("", resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId,
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_,
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs,
*resp.MoreData, s.config.FetchSize)
} else {
return nil, statusErr
}
@@ -521,18 +521,18 @@ func (s *Session)
ExecuteAggregationQueryWithLegalNodes(paths []string, aggregat
request := rpc.TSAggregationQueryReq{SessionId: s.sessionId,
StatementId: s.requestStatementId, Paths: paths,
Aggregations: aggregations, StartTime: startTime, EndTime:
endTime, Interval: interval, FetchSize: &s.config.FetchSize,
Timeout: timeoutMs, LegalPathNodes: legalNodes}
- if resp, err := s.client.ExecuteAggregationQuery(context.Background(),
&request); err == nil {
+ if resp, err :=
s.client.ExecuteAggregationQueryV2(context.Background(), &request); err == nil {
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
- return NewSessionDataSet("", resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client,
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil &&
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+ return NewSessionDataSet("", resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId,
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_,
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs,
*resp.MoreData, s.config.FetchSize)
} else {
return nil, statusErr
}
} else {
if s.reconnect() {
request.SessionId = s.sessionId
- resp, err =
s.client.ExecuteAggregationQuery(context.Background(), &request)
+ resp, err =
s.client.ExecuteAggregationQueryV2(context.Background(), &request)
if statusErr := VerifySuccess(resp.Status); statusErr
== nil {
- return NewSessionDataSet("", resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client,
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil &&
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+ return NewSessionDataSet("", resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId,
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_,
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs,
*resp.MoreData, s.config.FetchSize)
} else {
return nil, statusErr
}
@@ -550,7 +550,7 @@ func (s *Session) ExecuteGroupByQueryIntervalQuery(database
*string, device, mea
Timeout: timeoutMs, IsAligned: isAligned}
if resp, err :=
s.client.ExecuteGroupByQueryIntervalQuery(context.Background(), &request); err
== nil {
if statusErr := VerifySuccess(resp.Status); statusErr == nil {
- return NewSessionDataSet("", resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client,
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil &&
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+ return NewSessionDataSet("", resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId,
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_,
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs,
*resp.MoreData, s.config.FetchSize)
} else {
return nil, statusErr
}
@@ -559,7 +559,7 @@ func (s *Session) ExecuteGroupByQueryIntervalQuery(database
*string, device, mea
request.SessionId = s.sessionId
resp, err =
s.client.ExecuteGroupByQueryIntervalQuery(context.Background(), &request)
if statusErr := VerifySuccess(resp.Status); statusErr
== nil {
- return NewSessionDataSet("", resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId, s.client,
s.sessionId, resp.QueryDataSet, resp.IgnoreTimeStamp != nil &&
*resp.IgnoreTimeStamp, s.config.FetchSize, timeoutMs), err
+ return NewSessionDataSet("", resp.Columns,
resp.DataTypeList, resp.ColumnNameIndexMap, *resp.QueryId,
s.requestStatementId, s.client, s.sessionId, resp.QueryResult_,
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, timeoutMs,
*resp.MoreData, s.config.FetchSize)
} else {
return nil, statusErr
}
@@ -852,17 +852,17 @@ func (s *Session) ExecuteRawDataQuery(paths []string,
startTime int64, endTime i
EndTime: endTime,
StatementId: s.requestStatementId,
}
- resp, err := s.client.ExecuteRawDataQuery(context.Background(),
&request)
+ resp, err := s.client.ExecuteRawDataQueryV2(context.Background(),
&request)
if err != nil && resp == nil {
if s.reconnect() {
request.SessionId = s.sessionId
request.StatementId = s.requestStatementId
- resp, err =
s.client.ExecuteRawDataQuery(context.Background(), &request)
+ resp, err =
s.client.ExecuteRawDataQueryV2(context.Background(), &request)
}
}
- return s.genDataSet("", resp), err
+ return s.genDataSet("", resp)
}
func (s *Session) ExecuteUpdateStatement(sql string) (*SessionDataSet, error) {
@@ -872,28 +872,32 @@ func (s *Session) ExecuteUpdateStatement(sql string)
(*SessionDataSet, error) {
StatementId: s.requestStatementId,
FetchSize: &s.config.FetchSize,
}
- resp, err := s.client.ExecuteUpdateStatement(context.Background(),
&request)
+ resp, err := s.client.ExecuteUpdateStatementV2(context.Background(),
&request)
if err != nil && resp == nil {
if s.reconnect() {
request.SessionId = s.sessionId
request.StatementId = s.requestStatementId
- resp, err =
s.client.ExecuteUpdateStatement(context.Background(), &request)
+ resp, err =
s.client.ExecuteUpdateStatementV2(context.Background(), &request)
}
}
- return s.genDataSet(sql, resp), err
+ return s.genDataSet(sql, resp)
}
-func (s *Session) genDataSet(sql string, resp *rpc.TSExecuteStatementResp)
*SessionDataSet {
+func (s *Session) genDataSet(sql string, resp *rpc.TSExecuteStatementResp)
(*SessionDataSet, error) {
var queryId int64
if resp.QueryId == nil {
queryId = 0
} else {
queryId = *resp.QueryId
}
+ moreData := false
+ if resp.MoreData != nil {
+ moreData = *resp.MoreData
+ }
return NewSessionDataSet(sql, resp.Columns, resp.DataTypeList,
resp.ColumnNameIndexMap,
- queryId, s.client, s.sessionId, resp.QueryDataSet,
resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, s.config.FetchSize, nil)
+ queryId, s.requestStatementId, s.client, s.sessionId,
resp.QueryResult_, resp.IgnoreTimeStamp != nil && *resp.IgnoreTimeStamp, nil,
moreData, s.config.FetchSize)
}
func (s *Session) genInsertTabletsReq(tablets []*Tablet, isAligned bool)
(*rpc.TSInsertTabletsReq, error) {
@@ -1028,7 +1032,7 @@ func valuesToBytes(dataTypes []TSDataType, values
[]interface{}) ([]byte, error)
case DATE:
switch s := v.(type) {
case time.Time:
- date, err := dateToInt32(s)
+ date, err := DateToInt32(s)
if err != nil {
return nil, err
}
@@ -1236,3 +1240,7 @@ func (s *Session) reconnect() bool {
}
return connectedSuccess
}
+
+func (s *Session) SetFetchSize(fetchSize int32) {
+ s.config.FetchSize = fetchSize
+}
diff --git a/client/sessiondataset.go b/client/sessiondataset.go
index b1fafd4..d177a44 100644
--- a/client/sessiondataset.go
+++ b/client/sessiondataset.go
@@ -1,128 +1,126 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
package client
import (
"github.com/apache/iotdb-client-go/rpc"
-)
-
-const (
- TimestampColumnName = "Time"
+ "time"
)
type SessionDataSet struct {
ioTDBRpcDataSet *IoTDBRpcDataSet
}
-// Next prepares the next result row for reading,
-// returns true on success, or false if there is no next result row or an error
-// appened while preparing it.
-// consulted Err should be consulted to distinguish between the two cases.
-// This is not goroutine safe
+func NewSessionDataSet(sql string, columnNameList []string, columnTypeList
[]string, columnNameIndex map[string]int32, queryId int64, statementId int64,
client *rpc.IClientRPCServiceClient, sessionId int64, queryResult [][]byte,
ignoreTimestamp bool, timeout *int64, moreData bool, fetchSize int32)
(*SessionDataSet, error) {
+ rpcDataSet, err := NewIoTDBRpcDataSet(sql, columnNameList,
columnTypeList, columnNameIndex, ignoreTimestamp, moreData, queryId,
statementId, client, sessionId, queryResult, fetchSize, timeout)
+ if err != nil {
+ return nil, err
+ }
+ return &SessionDataSet{ioTDBRpcDataSet: rpcDataSet}, nil
+}
+
func (s *SessionDataSet) Next() (bool, error) {
- return s.ioTDBRpcDataSet.next()
+ return s.ioTDBRpcDataSet.Next()
}
-// GetText returns string value of column value on row.
-// This is not goroutine safe
-func (s *SessionDataSet) GetText(columnName string) string {
- return s.ioTDBRpcDataSet.getText(columnName)
+func (s *SessionDataSet) Close() error {
+ return s.ioTDBRpcDataSet.Close()
}
-func (s *SessionDataSet) IsNull(columnName string) bool {
- return s.ioTDBRpcDataSet.isNullWithColumnName(columnName)
+func (s *SessionDataSet) IsNull(columnName string) (bool, error) {
+ return s.ioTDBRpcDataSet.isNullByColumnName(columnName), nil
}
-func (s *SessionDataSet) GetBool(columnName string) bool {
- return s.ioTDBRpcDataSet.getBool(columnName)
+func (s *SessionDataSet) IsNullByIndex(columnIndex int32) (bool, error) {
+ return s.ioTDBRpcDataSet.isNullByIndex(columnIndex)
}
-func (s *SessionDataSet) Scan(dest ...interface{}) error {
- return s.ioTDBRpcDataSet.scan(dest...)
+func (s *SessionDataSet) GetBooleanByIndex(columnIndex int32) (bool, error) {
+ return s.ioTDBRpcDataSet.getBooleanByIndex(columnIndex)
}
-func (s *SessionDataSet) GetFloat(columnName string) float32 {
- return s.ioTDBRpcDataSet.getFloat(columnName)
+func (s *SessionDataSet) GetBoolean(columnName string) (bool, error) {
+ return s.ioTDBRpcDataSet.getBoolean(columnName)
+}
+
+func (s *SessionDataSet) GetDoubleByIndex(columnIndex int32) (float64, error) {
+ return s.ioTDBRpcDataSet.getDoubleByIndex(columnIndex)
}
-func (s *SessionDataSet) GetDouble(columnName string) float64 {
+func (s *SessionDataSet) GetDouble(columnName string) (float64, error) {
return s.ioTDBRpcDataSet.getDouble(columnName)
}
-func (s *SessionDataSet) GetInt32(columnName string) int32 {
- return s.ioTDBRpcDataSet.getInt32(columnName)
+func (s *SessionDataSet) GetFloatByIndex(columnIndex int32) (float32, error) {
+ return s.ioTDBRpcDataSet.getFloatByIndex(columnIndex)
}
-func (s *SessionDataSet) GetInt64(columnName string) int64 {
- return s.ioTDBRpcDataSet.getInt64(columnName)
+func (s *SessionDataSet) GetFloat(columnName string) (float32, error) {
+ return s.ioTDBRpcDataSet.getFloat(columnName)
}
-func (s *SessionDataSet) GetTimestamp() int64 {
- return s.ioTDBRpcDataSet.GetTimestamp()
+func (s *SessionDataSet) GetIntByIndex(columnIndex int32) (int32, error) {
+ return s.ioTDBRpcDataSet.getIntByIndex(columnIndex)
}
-func (s *SessionDataSet) GetValue(columnName string) interface{} {
- return s.ioTDBRpcDataSet.getValue(columnName)
+func (s *SessionDataSet) GetInt(columnName string) (int32, error) {
+ return s.ioTDBRpcDataSet.getInt(columnName)
}
-func (s *SessionDataSet) GetRowRecord() (*RowRecord, error) {
- return s.ioTDBRpcDataSet.getRowRecord()
+func (s *SessionDataSet) GetLongByIndex(columnIndex int32) (int64, error) {
+ return s.ioTDBRpcDataSet.getLongByIndex(columnIndex)
}
-func (s *SessionDataSet) GetColumnCount() int {
- return s.ioTDBRpcDataSet.columnCount
+func (s *SessionDataSet) GetLong(columnName string) (int64, error) {
+ return s.ioTDBRpcDataSet.getLong(columnName)
}
-func (s *SessionDataSet) GetColumnDataType(columnIndex int) TSDataType {
- return s.ioTDBRpcDataSet.columnTypeList[columnIndex]
+func (s *SessionDataSet) GetObjectByIndex(columnIndex int32) (interface{},
error) {
+ return s.ioTDBRpcDataSet.getObjectByIndex(columnIndex)
}
-func (s *SessionDataSet) GetColumnName(columnIndex int) string {
- return s.ioTDBRpcDataSet.columnNameList[columnIndex]
+func (s *SessionDataSet) GetObject(columnName string) (interface{}, error) {
+ return s.ioTDBRpcDataSet.getObject(columnName)
}
-func (s *SessionDataSet) GetColumnNames() []string {
- return s.ioTDBRpcDataSet.columnNameList
+func (s *SessionDataSet) GetStringByIndex(columnIndex int32) (string, error) {
+ return s.ioTDBRpcDataSet.getStringByIndex(columnIndex)
}
-func (s *SessionDataSet) IsIgnoreTimeStamp() bool {
- return s.ioTDBRpcDataSet.ignoreTimeStamp
+func (s *SessionDataSet) GetString(columnName string) (string, error) {
+ return s.ioTDBRpcDataSet.getValueByName(columnName)
}
-func (s *SessionDataSet) IsClosed() bool {
- return s.ioTDBRpcDataSet.IsClosed()
+func (s *SessionDataSet) GetTimestampByIndex(columnIndex int32) (time.Time,
error) {
+ return s.ioTDBRpcDataSet.getTimestampByIndex(columnIndex)
}
-func (s *SessionDataSet) Close() error {
- return s.ioTDBRpcDataSet.Close()
+func (s *SessionDataSet) GetTimestamp(columnName string) (time.Time, error) {
+ return s.ioTDBRpcDataSet.getTimestamp(columnName)
}
-func NewSessionDataSet(sql string, columnNameList []string, columnTypeList
[]string,
- columnNameIndex map[string]int32,
- queryId int64, client *rpc.IClientRPCServiceClient, sessionId int64,
queryDataSet *rpc.TSQueryDataSet,
- ignoreTimeStamp bool, fetchSize int32, timeoutMs *int64)
*SessionDataSet {
+func (s *SessionDataSet) GetDateByIndex(columnIndex int32) (time.Time, error) {
+ return s.ioTDBRpcDataSet.GetDateByIndex(columnIndex)
+}
- return &SessionDataSet{
- ioTDBRpcDataSet: NewIoTDBRpcDataSet(sql, columnNameList,
columnTypeList,
- columnNameIndex,
- queryId, client, sessionId, queryDataSet,
- ignoreTimeStamp, fetchSize, timeoutMs),
- }
+func (s *SessionDataSet) GetDate(columnName string) (time.Time, error) {
+ return s.ioTDBRpcDataSet.GetDate(columnName)
+}
+
+func (s *SessionDataSet) GetBlobByIndex(columnIndex int32) (*Binary, error) {
+ return s.ioTDBRpcDataSet.getBinaryByIndex(columnIndex)
+}
+
+func (s *SessionDataSet) GetBlob(columnName string) (*Binary, error) {
+ return s.ioTDBRpcDataSet.getBinary(columnName)
+}
+
+func (s *SessionDataSet) FindColumn(columnName string) int32 {
+ return s.ioTDBRpcDataSet.findColumn(columnName)
+}
+
+func (s *SessionDataSet) GetColumnNames() []string {
+ return s.ioTDBRpcDataSet.columnNameList
+}
+
+func (s *SessionDataSet) GetColumnTypes() []string {
+ return s.ioTDBRpcDataSet.columnTypeList
}
diff --git a/client/tablet.go b/client/tablet.go
index 7b62f89..2268018 100644
--- a/client/tablet.go
+++ b/client/tablet.go
@@ -195,7 +195,7 @@ func (t *Tablet) SetValueAt(value interface{}, columnIndex,
rowIndex int) error
values := t.values[columnIndex].([]int32)
switch v := value.(type) {
case time.Time:
- val, err := dateToInt32(v)
+ val, err := DateToInt32(v)
if err != nil {
return err
}
@@ -241,7 +241,7 @@ func (t *Tablet) GetValueAt(columnIndex, rowIndex int)
(interface{}, error) {
case BLOB:
return t.values[columnIndex].([][]byte)[rowIndex], nil
case DATE:
- return int32ToDate(t.values[columnIndex].([]int32)[rowIndex])
+ return Int32ToDate(t.values[columnIndex].([]int32)[rowIndex])
default:
return nil, fmt.Errorf("illegal datatype %v", schema.DataType)
}
diff --git a/client/tsblock.go b/client/tsblock.go
new file mode 100644
index 0000000..b9365e7
--- /dev/null
+++ b/client/tsblock.go
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package client
+
+import (
+ "bytes"
+ "encoding/binary"
+ "fmt"
+)
+
+type TsBlock struct {
+ timeColumn *TimeColumn
+ valueColumns []Column
+ positionCount int32
+}
+
+func NewTsBlock(positionCount int32, timeColumn *TimeColumn, valueColumns
...Column) (*TsBlock, error) {
+ if valueColumns == nil {
+ return nil, fmt.Errorf("blocks is null")
+ }
+ return &TsBlock{
+ timeColumn: timeColumn,
+ valueColumns: valueColumns,
+ positionCount: positionCount,
+ }, nil
+}
+
+func DeserializeTsBlock(data []byte) (*TsBlock, error) {
+ // Serialized tsblock:
+ //
+-------------+---------------+---------+------------+-----------+----------+
+ // | val col cnt | val col types | pos cnt | encodings | time col
| val col |
+ //
+-------------+---------------+---------+------------+-----------+----------+
+ // | int32 | list[byte] | int32 | list[byte] | bytes
| bytes |
+ //
+-------------+---------------+---------+------------+-----------+----------+
+
+ reader := bytes.NewReader(data)
+ // value column count
+ var valueColumnCount int32
+ if err := binary.Read(reader, binary.BigEndian, &valueColumnCount); err
!= nil {
+ return nil, err
+ }
+
+ // value column data types
+ valueColumnDataTypes := make([]TSDataType, valueColumnCount)
+ for i := int32(0); i < valueColumnCount; i++ {
+ dataType, err := deserializeDataType(reader)
+ if err != nil {
+ return nil, err
+ }
+ valueColumnDataTypes[i] = dataType
+ }
+
+ // position count
+ var positionCount int32
+ if err := binary.Read(reader, binary.BigEndian, &positionCount); err !=
nil {
+ return nil, err
+ }
+
+ // column encodings
+ columnEncodings := make([]ColumnEncoding, valueColumnCount+1)
+ for i := int32(0); i < valueColumnCount+1; i++ {
+ columnEncoding, err := deserializeColumnEncoding(reader)
+ if err != nil {
+ return nil, err
+ }
+ columnEncodings[i] = columnEncoding
+ }
+
+ // time column
+ timeColumnDecoder, err := getColumnDecoder(columnEncodings[0])
+ if err != nil {
+ return nil, err
+ }
+ timeColumn, err := timeColumnDecoder.ReadTimeColumn(reader,
positionCount)
+ if err != nil {
+ return nil, err
+ }
+
+ // value columns
+ valueColumns := make([]Column, valueColumnCount)
+ for i := int32(0); i < valueColumnCount; i++ {
+ valueColumnDecoder, err :=
getColumnDecoder(columnEncodings[i+1])
+ if err != nil {
+ return nil, err
+ }
+ valueColumn, err := valueColumnDecoder.ReadColumn(reader,
valueColumnDataTypes[i], positionCount)
+ if err != nil {
+ return nil, err
+ }
+ valueColumns[i] = valueColumn
+ }
+ return NewTsBlock(positionCount, timeColumn, valueColumns...)
+}
+
+func deserializeDataType(reader *bytes.Reader) (TSDataType, error) {
+ b, err := reader.ReadByte()
+ if err != nil {
+ return UNKNOWN, err
+ }
+ return getDataTypeByByte(b)
+}
+
+func deserializeColumnEncoding(reader *bytes.Reader) (ColumnEncoding, error) {
+ b, err := reader.ReadByte()
+ if err != nil {
+ return RLE_COLUMN_ENCODING, err
+ }
+ return getColumnEncodingByByte(b)
+}
+
+func (t *TsBlock) GetPositionCount() int32 {
+ return t.positionCount
+}
+
+func (t *TsBlock) GetStartTime() int64 {
+ return t.timeColumn.GetStartTime()
+}
+
+func (t *TsBlock) GetEndTime() int64 {
+ return t.timeColumn.GetEndTime()
+}
+
+func (t *TsBlock) IsEmpty() bool {
+ return t.positionCount == 0
+}
+
+func (t *TsBlock) GetTimeByIndex(index int32) (int64, error) {
+ return t.timeColumn.GetLong(index)
+}
+
+func (t *TsBlock) GetValueColumnCount() int32 {
+ return int32(len(t.valueColumns))
+}
+
+func (t *TsBlock) GetTimeColumn() Column {
+ return t.timeColumn
+}
+
+func (t *TsBlock) GetValueColumns() *[]Column {
+ return &t.valueColumns
+}
+
+func (t *TsBlock) GetColumn(columnIndex int32) Column {
+ return t.valueColumns[columnIndex]
+}
diff --git a/client/utils.go b/client/utils.go
index 692e9a5..41cc783 100644
--- a/client/utils.go
+++ b/client/utils.go
@@ -81,7 +81,7 @@ func bytesToHexString(input []byte) string {
return hexString
}
-func dateToInt32(localDate time.Time) (int32, error) {
+func DateToInt32(localDate time.Time) (int32, error) {
if localDate.IsZero() {
return 0, errors.New("date expression is null or empty")
}
@@ -96,7 +96,7 @@ func dateToInt32(localDate time.Time) (int32, error) {
return int32(result), nil
}
-func int32ToDate(val int32) (time.Time, error) {
+func Int32ToDate(val int32) (time.Time, error) {
date := int(val)
year := date / 10000
month := (date / 100) % 100
@@ -112,7 +112,7 @@ func int32ToDate(val int32) (time.Time, error) {
}
func bytesToDate(bys []byte) (time.Time, error) {
- return int32ToDate(bytesToInt32(bys))
+ return Int32ToDate(bytesToInt32(bys))
}
func verifySuccesses(statuses []*common.TSStatus) error {
@@ -149,3 +149,19 @@ func VerifySuccess(status *common.TSStatus) error {
}
return nil
}
+
+type Binary struct {
+ values []byte
+}
+
+func NewBinary(v []byte) *Binary {
+ return &Binary{v}
+}
+
+func (b *Binary) GetStringValue() string {
+ return string(b.values)
+}
+
+func (b *Binary) GetValues() []byte {
+ return b.values
+}
diff --git a/example/session_example.go b/example/session_example.go
index 32026c1..1b31514 100644
--- a/example/session_example.go
+++ b/example/session_example.go
@@ -162,41 +162,22 @@ func connectCluster() {
}
func printDevice1(sds *client.SessionDataSet) {
- showTimestamp := !sds.IsIgnoreTimeStamp()
- if showTimestamp {
- fmt.Print("Time\t\t\t\t")
- }
-
for _, columnName := range sds.GetColumnNames() {
fmt.Printf("%s\t", columnName)
}
fmt.Println()
for next, err := sds.Next(); err == nil && next; next, err = sds.Next()
{
- if showTimestamp {
- fmt.Printf("%s\t",
sds.GetText(client.TimestampColumnName))
- }
-
- var restartCount int32
- var price float64
- var tickCount int64
- var temperature float32
- var description string
- var status bool
-
- // All of iotdb datatypes can be scan into string variables
- // var restartCount string
- // var price string
- // var tickCount string
- // var temperature string
- // var description string
- // var status string
-
- if err := sds.Scan(&restartCount, &tickCount, &price,
&temperature, &description, &status); err != nil {
- log.Fatal(err)
- }
+ timestamp, _ := sds.GetStringByIndex(1)
+ restartCount, _ := sds.GetIntByIndex(2)
+ tickCount, _ := sds.GetLongByIndex(3)
+ price, _ := sds.GetDoubleByIndex(4)
+ temperature, _ := sds.GetFloatByIndex(5)
+ description, _ := sds.GetStringByIndex(6)
+ status, _ := sds.GetBooleanByIndex(7)
whitespace := "\t\t"
+ fmt.Printf("%s\t", timestamp)
fmt.Printf("%v%s", restartCount, whitespace)
fmt.Printf("%v%s", price, whitespace)
fmt.Printf("%v%s", tickCount, whitespace)
@@ -209,35 +190,34 @@ func printDevice1(sds *client.SessionDataSet) {
}
func printDataSet0(sessionDataSet *client.SessionDataSet) {
- showTimestamp := !sessionDataSet.IsIgnoreTimeStamp()
- if showTimestamp {
- fmt.Print("Time\t\t\t\t")
- }
-
- for i := 0; i < sessionDataSet.GetColumnCount(); i++ {
- fmt.Printf("%s\t", sessionDataSet.GetColumnName(i))
+ columns := sessionDataSet.GetColumnNames()
+ for _, columnName := range columns {
+ fmt.Printf("%s\t", columnName)
}
fmt.Println()
for next, err := sessionDataSet.Next(); err == nil && next; next, err =
sessionDataSet.Next() {
- if showTimestamp {
- fmt.Printf("%s\t",
sessionDataSet.GetText(client.TimestampColumnName))
- }
- for i := 0; i < sessionDataSet.GetColumnCount(); i++ {
- columnName := sessionDataSet.GetColumnName(i)
- switch sessionDataSet.GetColumnDataType(i) {
+ for i, columnName := range columns {
+ dataType, _ :=
client.GetDataTypeByStr(sessionDataSet.GetColumnTypes()[i])
+ switch dataType {
case client.BOOLEAN:
- fmt.Print(sessionDataSet.GetBool(columnName))
+ value, _ :=
sessionDataSet.GetBoolean(columnName)
+ fmt.Print(value)
case client.INT32:
- fmt.Print(sessionDataSet.GetInt32(columnName))
+ value, _ := sessionDataSet.GetInt(columnName)
+ fmt.Print(value)
case client.INT64, client.TIMESTAMP:
- fmt.Print(sessionDataSet.GetInt64(columnName))
+ value, _ := sessionDataSet.GetLong(columnName)
+ fmt.Print(value)
case client.FLOAT:
- fmt.Print(sessionDataSet.GetFloat(columnName))
+ value, _ := sessionDataSet.GetFloat(columnName)
+ fmt.Print(value)
case client.DOUBLE:
- fmt.Print(sessionDataSet.GetDouble(columnName))
+ value, _ := sessionDataSet.GetDouble(columnName)
+ fmt.Print(value)
case client.TEXT, client.STRING, client.BLOB,
client.DATE:
- fmt.Print(sessionDataSet.GetText(columnName))
+ value, _ := sessionDataSet.GetString(columnName)
+ fmt.Print(value)
default:
}
fmt.Print("\t\t")
@@ -247,58 +227,46 @@ func printDataSet0(sessionDataSet *client.SessionDataSet)
{
}
func printDataSet1(sds *client.SessionDataSet) {
- showTimestamp := !sds.IsIgnoreTimeStamp()
- if showTimestamp {
- fmt.Print("Time\t\t\t\t")
- }
-
- for i := 0; i < sds.GetColumnCount(); i++ {
- fmt.Printf("%s\t", sds.GetColumnName(i))
+ columnNames := sds.GetColumnNames()
+ for _, value := range columnNames {
+ fmt.Printf("%s\t", value)
}
fmt.Println()
for next, err := sds.Next(); err == nil && next; next, err = sds.Next()
{
- if showTimestamp {
- fmt.Printf("%s\t",
sds.GetText(client.TimestampColumnName))
- }
- for i := 0; i < sds.GetColumnCount(); i++ {
- columnName := sds.GetColumnName(i)
- v := sds.GetValue(columnName)
- if v == nil {
- v = "null"
+ for _, columnName := range columnNames {
+ isNull, _ := sds.IsNull(columnName)
+
+ if isNull {
+ fmt.Printf("%v\t\t", "null")
+ } else {
+ v, _ := sds.GetString(columnName)
+ fmt.Printf("%v\t\t", v)
}
- fmt.Printf("%v\t\t", v)
}
fmt.Println()
}
}
func printDataSet2(sds *client.SessionDataSet) {
- showTimestamp := !sds.IsIgnoreTimeStamp()
- if showTimestamp {
- fmt.Print("Time\t\t\t\t")
- }
-
- for i := 0; i < sds.GetColumnCount(); i++ {
- fmt.Printf("%s\t", sds.GetColumnName(i))
+ columnNames := sds.GetColumnNames()
+ for _, value := range columnNames {
+ fmt.Printf("%s\t", value)
}
fmt.Println()
for next, err := sds.Next(); err == nil && next; next, err = sds.Next()
{
- if showTimestamp {
- fmt.Printf("%s\t",
sds.GetText(client.TimestampColumnName))
- }
+ for i := int32(0); i < int32(len(columnNames)); i++ {
+ isNull, _ := sds.IsNullByIndex(i)
- if record, err := sds.GetRowRecord(); err == nil {
- for _, field := range record.GetFields() {
- v := field.GetValue()
- if field.IsNull() {
- v = "null"
- }
+ if isNull {
+ fmt.Printf("%v\t\t", "null")
+ } else {
+ v, _ := sds.GetStringByIndex(i)
fmt.Printf("%v\t\t", v)
}
- fmt.Println()
}
+ fmt.Println()
}
}
@@ -641,7 +609,7 @@ func executeAggregationQueryStatementWithLegalNodes(paths
[]string, aggregations
}
func executeRawDataQuery() {
- session.ExecuteUpdateStatement("insert into root.ln.wf02.wt02(time,s5)
values(1,true)")
+ session.ExecuteNonQueryStatement("insert into
root.ln.wf02.wt02(time,s5) values(1,true)")
var (
paths = []string{"root.ln.wf02.wt02.s5"}
startTime int64 = 1
diff --git a/example/session_pool/session_pool_example.go
b/example/session_pool/session_pool_example.go
index 459393b..6f52ffe 100644
--- a/example/session_pool/session_pool_example.go
+++ b/example/session_pool/session_pool_example.go
@@ -26,7 +26,6 @@ import (
"log"
"math/rand"
"strings"
- "sync"
"time"
"github.com/apache/iotdb-client-go/client"
@@ -55,18 +54,6 @@ func main() {
sessionPool = client.NewSessionPool(config, 3, 60000, 60000, false)
defer sessionPool.Close()
- var wg sync.WaitGroup
- for i := 0; i < 10000; i++ {
- var j = i
- wg.Add(1)
- go func() {
- defer wg.Done()
- setStorageGroup(fmt.Sprintf("root.ln-%d", j))
- deleteStorageGroup(fmt.Sprintf("root.ln-%d", j))
-
- }()
-
- }
//useNodeUrls()
setStorageGroup("root.ln1")
setStorageGroup("root.ln2")
@@ -136,7 +123,6 @@ func main() {
insertAlignedTablets()
deleteTimeseries("root.ln.device1.*")
executeQueryStatement("show timeseries root.**")
- wg.Wait()
}
@@ -604,7 +590,7 @@ func executeRawDataQuery() {
log.Print(err)
return
}
- session.ExecuteUpdateStatement("insert into root.ln.wf02.wt02(time,s5)
values(1,true)")
+ session.ExecuteNonQueryStatement("insert into
root.ln.wf02.wt02(time,s5) values(1,true)")
var (
paths []string = []string{"root.ln.wf02.wt02.s5"}
startTime int64 = 1
@@ -631,41 +617,22 @@ func executeBatchStatement() {
}
func printDevice1(sds *client.SessionDataSet) {
- showTimestamp := !sds.IsIgnoreTimeStamp()
- if showTimestamp {
- fmt.Print("Time\t\t\t\t")
- }
-
for _, columnName := range sds.GetColumnNames() {
fmt.Printf("%s\t", columnName)
}
fmt.Println()
for next, err := sds.Next(); err == nil && next; next, err = sds.Next()
{
- if showTimestamp {
- fmt.Printf("%s\t",
sds.GetText(client.TimestampColumnName))
- }
-
- var restartCount int32
- var price float64
- var tickCount int64
- var temperature float32
- var description string
- var status bool
-
- // All of iotdb datatypes can be scan into string variables
- // var restartCount string
- // var price string
- // var tickCount string
- // var temperature string
- // var description string
- // var status string
-
- if err := sds.Scan(&restartCount, &tickCount, &price,
&temperature, &description, &status); err != nil {
- log.Fatal(err)
- }
+ timestamp, _ := sds.GetStringByIndex(1)
+ restartCount, _ := sds.GetIntByIndex(2)
+ tickCount, _ := sds.GetLongByIndex(3)
+ price, _ := sds.GetDoubleByIndex(4)
+ temperature, _ := sds.GetFloatByIndex(5)
+ description, _ := sds.GetStringByIndex(6)
+ status, _ := sds.GetBooleanByIndex(7)
whitespace := "\t\t"
+ fmt.Printf("%s\t", timestamp)
fmt.Printf("%v%s", restartCount, whitespace)
fmt.Printf("%v%s", price, whitespace)
fmt.Printf("%v%s", tickCount, whitespace)
@@ -678,35 +645,34 @@ func printDevice1(sds *client.SessionDataSet) {
}
func printDataSet0(sessionDataSet *client.SessionDataSet) {
- showTimestamp := !sessionDataSet.IsIgnoreTimeStamp()
- if showTimestamp {
- fmt.Print("Time\t\t\t\t")
- }
-
- for i := 0; i < sessionDataSet.GetColumnCount(); i++ {
- fmt.Printf("%s\t", sessionDataSet.GetColumnName(i))
+ columns := sessionDataSet.GetColumnNames()
+ for _, columnName := range columns {
+ fmt.Printf("%s\t", columnName)
}
fmt.Println()
for next, err := sessionDataSet.Next(); err == nil && next; next, err =
sessionDataSet.Next() {
- if showTimestamp {
- fmt.Printf("%s\t",
sessionDataSet.GetText(client.TimestampColumnName))
- }
- for i := 0; i < sessionDataSet.GetColumnCount(); i++ {
- columnName := sessionDataSet.GetColumnName(i)
- switch sessionDataSet.GetColumnDataType(i) {
+ for i, columnName := range columns {
+ dataType, _ :=
client.GetDataTypeByStr(sessionDataSet.GetColumnTypes()[i])
+ switch dataType {
case client.BOOLEAN:
- fmt.Print(sessionDataSet.GetBool(columnName))
+ value, _ :=
sessionDataSet.GetBoolean(columnName)
+ fmt.Print(value)
case client.INT32:
- fmt.Print(sessionDataSet.GetInt32(columnName))
- case client.INT64:
- fmt.Print(sessionDataSet.GetInt64(columnName))
+ value, _ := sessionDataSet.GetInt(columnName)
+ fmt.Print(value)
+ case client.INT64, client.TIMESTAMP:
+ value, _ := sessionDataSet.GetLong(columnName)
+ fmt.Print(value)
case client.FLOAT:
- fmt.Print(sessionDataSet.GetFloat(columnName))
+ value, _ := sessionDataSet.GetFloat(columnName)
+ fmt.Print(value)
case client.DOUBLE:
- fmt.Print(sessionDataSet.GetDouble(columnName))
- case client.TEXT:
- fmt.Print(sessionDataSet.GetText(columnName))
+ value, _ := sessionDataSet.GetDouble(columnName)
+ fmt.Print(value)
+ case client.TEXT, client.STRING, client.BLOB,
client.DATE:
+ value, _ := sessionDataSet.GetString(columnName)
+ fmt.Print(value)
default:
}
fmt.Print("\t\t")
@@ -716,58 +682,46 @@ func printDataSet0(sessionDataSet *client.SessionDataSet)
{
}
func printDataSet1(sds *client.SessionDataSet) {
- showTimestamp := !sds.IsIgnoreTimeStamp()
- if showTimestamp {
- fmt.Print("Time\t\t\t\t")
- }
-
- for i := 0; i < sds.GetColumnCount(); i++ {
- fmt.Printf("%s\t", sds.GetColumnName(i))
+ columnNames := sds.GetColumnNames()
+ for _, value := range columnNames {
+ fmt.Printf("%s\t", value)
}
fmt.Println()
for next, err := sds.Next(); err == nil && next; next, err = sds.Next()
{
- if showTimestamp {
- fmt.Printf("%s\t",
sds.GetText(client.TimestampColumnName))
- }
- for i := 0; i < sds.GetColumnCount(); i++ {
- columnName := sds.GetColumnName(i)
- v := sds.GetValue(columnName)
- if v == nil {
- v = "null"
+ for _, columnName := range columnNames {
+ isNull, _ := sds.IsNull(columnName)
+
+ if isNull {
+ fmt.Printf("%v\t\t", "null")
+ } else {
+ v, _ := sds.GetString(columnName)
+ fmt.Printf("%v\t\t", v)
}
- fmt.Printf("%v\t\t", v)
}
fmt.Println()
}
}
func printDataSet2(sds *client.SessionDataSet) {
- showTimestamp := !sds.IsIgnoreTimeStamp()
- if showTimestamp {
- fmt.Print("Time\t\t\t\t")
- }
-
- for i := 0; i < sds.GetColumnCount(); i++ {
- fmt.Printf("%s\t", sds.GetColumnName(i))
+ columnNames := sds.GetColumnNames()
+ for _, value := range columnNames {
+ fmt.Printf("%s\t", value)
}
fmt.Println()
for next, err := sds.Next(); err == nil && next; next, err = sds.Next()
{
- if showTimestamp {
- fmt.Printf("%s\t",
sds.GetText(client.TimestampColumnName))
- }
+ for i := int32(0); i < int32(len(columnNames)); i++ {
+ isNull, _ := sds.IsNullByIndex(i)
- if record, err := sds.GetRowRecord(); err == nil {
- for _, field := range record.GetFields() {
- v := field.GetValue()
- if field.IsNull() {
- v = "null"
- }
+ if isNull {
+ fmt.Printf("%v\t\t", "null")
+ } else {
+ v, _ := sds.GetStringByIndex(i)
fmt.Printf("%v\t\t", v)
}
- fmt.Println()
}
+ fmt.Println()
}
}
diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go
index 16eaf77..faf204e 100644
--- a/test/e2e/e2e_test.go
+++ b/test/e2e/e2e_test.go
@@ -77,6 +77,11 @@ func (s *e2eTestSuite) checkError(status *common.TSStatus,
err error) {
}
}
+func (s *e2eTestSuite) Test_NonQuery() {
+ _, err := s.session.ExecuteStatement("flush")
+ s.Require().NoError(err)
+}
+
func (s *e2eTestSuite) Test_WrongURL() {
clusterConfig := client.ClusterConfig{
NodeUrls: strings.Split("iotdb1:6667", ","),
@@ -102,8 +107,8 @@ func (s *e2eTestSuite) Test_CreateTimeseries() {
assert.NoError(err)
defer ds.Close()
assert.True(ds.Next())
- var timeseries string
- assert.NoError(ds.Scan(×eries))
+ timeseries, err := ds.GetStringByIndex(1)
+ assert.NoError(err)
assert.Equal(timeseries, "root.tsg1.dev1.status")
}
@@ -135,8 +140,8 @@ func (s *e2eTestSuite) Test_CreateAlignedTimeseries() {
assert.NoError(err)
defer ds.Close()
assert.True(ds.Next())
- var timeseries string
- assert.NoError(ds.Scan(×eries))
+ timeseries, err := ds.GetStringByIndex(1)
+ assert.NoError(err)
assert.Equal(timeseries, fullPath)
}
}
@@ -156,8 +161,8 @@ func (s *e2eTestSuite) Test_InsertRecords() {
assert.NoError(err)
defer ds.Close()
assert.True(ds.Next())
- var status string
- assert.NoError(ds.Scan(&status))
+ status, err := ds.GetString("root.tsg1.dev1.status")
+ assert.NoError(err)
assert.Equal(status, "Working")
}
@@ -176,9 +181,9 @@ func (s *e2eTestSuite) Test_InsertAlignedRecord() {
assert.NoError(err)
defer ds.Close()
assert.True(ds.Next())
- var status string
- assert.NoError(ds.Scan(&status))
- assert.Equal(status, "Working")
+ status, err := ds.GetString("root.tsg2.dev1.status")
+ assert.NoError(err)
+ assert.Equal("Working", status)
}
func (s *e2eTestSuite) Test_InsertAlignedRecords() {
@@ -195,8 +200,8 @@ func (s *e2eTestSuite) Test_InsertAlignedRecords() {
assert.NoError(err)
defer ds.Close()
assert.True(ds.Next())
- var temperature string
- assert.NoError(ds.Scan(&temperature))
+ temperature, err := ds.GetString("root.al1.dev3.temperature")
+ assert.NoError(err)
assert.Equal(temperature, "44")
}
@@ -227,10 +232,11 @@ func (s *e2eTestSuite)
Test_InsertAlignedRecordsOfOneDevice() {
assert.NoError(err)
defer ds.Close()
assert.True(ds.Next())
- var status string
- assert.NoError(ds.Scan(&status))
- assert.Equal(status, "2024-04-01")
+ date, err := ds.GetString("root.al1.dev4.date")
+ assert.NoError(err)
+ assert.Equal("2024-04-01", date)
}
+
func (s *e2eTestSuite) Test_InsertAlignedTablet() {
var timeseries = []string{"root.ln.device1.**"}
s.session.DeleteTimeseries(timeseries)
@@ -247,8 +253,8 @@ func (s *e2eTestSuite) Test_InsertAlignedTablet() {
assert.NoError(err)
defer ds.Close()
assert.True(ds.Next())
- var status string
- assert.NoError(ds.Scan(&status))
+ status, err := ds.GetStringByIndex(1)
+ assert.NoError(err)
assert.Equal(status, "12")
s.session.DeleteStorageGroup("root.ln.**")
}
@@ -269,8 +275,8 @@ func (s *e2eTestSuite)
Test_InsertAlignedTabletWithNilValue() {
assert.NoError(err)
defer ds.Close()
assert.True(ds.Next())
- var status string
- assert.NoError(ds.Scan(&status))
+ status, err := ds.GetStringByIndex(1)
+ assert.NoError(err)
assert.Equal(status, "12")
s.session.DeleteStorageGroup("root.ln.**")
}
@@ -389,12 +395,265 @@ func (s *e2eTestSuite) Test_InsertAlignedTablets() {
assert.NoError(err)
defer ds.Close()
assert.True(ds.Next())
- var status string
- assert.NoError(ds.Scan(&status))
+ status, err := ds.GetStringByIndex(1)
+ assert.NoError(err)
assert.Equal(status, "8")
s.session.DeleteStorageGroup("root.ln.**")
}
+func (s *e2eTestSuite) Test_FetchMoreData() {
+ var timeseries = []string{"root.ln.device1.**"}
+ s.session.SetFetchSize(1000)
+ s.session.DeleteTimeseries(timeseries)
+ writeCount := 10000
+ tablet1, err := createTablet(writeCount)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ tablets := []*client.Tablet{tablet1}
+ s.checkError(s.session.InsertAlignedTablets(tablets, false))
+
+ ds, err := s.session.ExecuteQueryStatement("select * from
root.ln.device1", nil)
+ count := 0
+ for {
+ if hasNext, err := ds.Next(); err != nil || !hasNext {
+ break
+ }
+ count += 1
+ }
+ s.Assert().Equal(writeCount, count)
+ s.session.DeleteStorageGroup("root.ln.**")
+}
+
+func (s *e2eTestSuite) Test_QueryAllDataType() {
+ measurementSchemas := []*client.MeasurementSchema{
+ {
+ Measurement: "s0",
+ DataType: client.BOOLEAN,
+ },
+ {
+ Measurement: "s1",
+ DataType: client.INT32,
+ },
+ {
+ Measurement: "s2",
+ DataType: client.INT64,
+ },
+ {
+ Measurement: "s3",
+ DataType: client.FLOAT,
+ },
+ {
+ Measurement: "s4",
+ DataType: client.DOUBLE,
+ },
+ {
+ Measurement: "s5",
+ DataType: client.TEXT,
+ },
+ {
+ Measurement: "s6",
+ DataType: client.TIMESTAMP,
+ },
+ {
+ Measurement: "s7",
+ DataType: client.DATE,
+ },
+ {
+ Measurement: "s8",
+ DataType: client.BLOB,
+ },
+ {
+ Measurement: "s9",
+ DataType: client.STRING,
+ },
+ }
+ tablet, err := client.NewTablet("root.tsg1.d1", measurementSchemas, 100)
+ s.NoError(err)
+ tablet.SetTimestamp(1, 0)
+ tablet.SetValueAt(true, 0, 0)
+ tablet.SetValueAt(int32(1), 1, 0)
+ tablet.SetValueAt(int64(1), 2, 0)
+ tablet.SetValueAt(float32(1), 3, 0)
+ tablet.SetValueAt(float64(1), 4, 0)
+ tablet.SetValueAt("text", 5, 0)
+ tablet.SetValueAt(int64(1), 6, 0)
+ expectedDate, _ := client.Int32ToDate(20250326)
+ tablet.SetValueAt(expectedDate, 7, 0)
+ tablet.SetValueAt([]byte{1}, 8, 0)
+ tablet.SetValueAt("string", 9, 0)
+ tablet.RowSize = 1
+
+ r, err := s.session.InsertAlignedTablet(tablet, true)
+ s.checkError(r, err)
+
+ sessionDataSet, err := s.session.ExecuteQueryStatement("select s0, s1,
s2, s3, s4, s5, s6, s7, s8, s9 from root.tsg1.d1 limit 1", nil)
+ for {
+ if hasNext, err := sessionDataSet.Next(); err != nil ||
!hasNext {
+ break
+ }
+ for _, columnName := range sessionDataSet.GetColumnNames() {
+ isNull, err := sessionDataSet.IsNull(columnName)
+ s.NoError(err)
+ s.False(isNull)
+ }
+ timeValue, err := sessionDataSet.GetLongByIndex(1)
+ s.NoError(err)
+ s.Equal(int64(1), timeValue)
+ boolValue, err := sessionDataSet.GetBooleanByIndex(2)
+ s.NoError(err)
+ s.Equal(true, boolValue)
+
+ intValue, err := sessionDataSet.GetIntByIndex(3)
+ s.NoError(err)
+ s.Equal(int32(1), intValue)
+
+ longValue, err := sessionDataSet.GetLongByIndex(4)
+ s.NoError(err)
+ s.Equal(int64(1), longValue)
+
+ floatValue, err := sessionDataSet.GetFloatByIndex(5)
+ s.NoError(err)
+ s.Equal(float32(1), floatValue)
+
+ doubleValue, err := sessionDataSet.GetDoubleByIndex(6)
+ s.NoError(err)
+ s.Equal(float64(1), doubleValue)
+
+ textValue, err := sessionDataSet.GetStringByIndex(7)
+ s.NoError(err)
+ s.Equal("text", textValue)
+
+ timestampValue, err := sessionDataSet.GetTimestampByIndex(8)
+ s.NoError(err)
+ s.Equal(time.Unix(0, 1e6), timestampValue)
+
+ dateValue, err := sessionDataSet.GetDateByIndex(9)
+ s.NoError(err)
+ s.Equal(expectedDate, dateValue)
+
+ blobValue, err := sessionDataSet.GetBlobByIndex(10)
+ s.NoError(err)
+ s.Equal([]byte{1}, blobValue.GetValues())
+
+ stringValue, err := sessionDataSet.GetStringByIndex(11)
+ s.NoError(err)
+ s.Equal("string", stringValue)
+ }
+ sessionDataSet.Close()
+
+ sessionDataSet, err = s.session.ExecuteQueryStatement("select s0, s1,
s2, s3, s4, s5, s6, s7, s8, s9 from root.tsg1.d1 limit 1", nil)
+ for {
+ if hasNext, err := sessionDataSet.Next(); err != nil ||
!hasNext {
+ break
+ }
+ for _, columnName := range sessionDataSet.GetColumnNames() {
+ isNull, err := sessionDataSet.IsNull(columnName)
+ s.NoError(err)
+ s.False(isNull)
+ }
+ timeValue, err := sessionDataSet.GetLong("Time")
+ s.NoError(err)
+ s.Equal(int64(1), timeValue)
+
+ boolValue, err := sessionDataSet.GetBoolean("root.tsg1.d1.s0")
+ s.NoError(err)
+ s.Equal(true, boolValue)
+
+ intValue, err := sessionDataSet.GetInt("root.tsg1.d1.s1")
+ s.NoError(err)
+ s.Equal(int32(1), intValue)
+
+ longValue, err := sessionDataSet.GetLong("root.tsg1.d1.s2")
+ s.NoError(err)
+ s.Equal(int64(1), longValue)
+
+ floatValue, err := sessionDataSet.GetFloat("root.tsg1.d1.s3")
+ s.NoError(err)
+ s.Equal(float32(1), floatValue)
+
+ doubleValue, err := sessionDataSet.GetDouble("root.tsg1.d1.s4")
+ s.NoError(err)
+ s.Equal(float64(1), doubleValue)
+
+ textValue, err := sessionDataSet.GetString("root.tsg1.d1.s5")
+ s.NoError(err)
+ s.Equal("text", textValue)
+
+ timestampValue, err :=
sessionDataSet.GetTimestamp("root.tsg1.d1.s6")
+ s.NoError(err)
+ s.Equal(time.Unix(0, 1e6), timestampValue)
+
+ dateValue, err := sessionDataSet.GetDate("root.tsg1.d1.s7")
+ s.NoError(err)
+ s.Equal(expectedDate, dateValue)
+
+ blobValue, err := sessionDataSet.GetBlob("root.tsg1.d1.s8")
+ s.NoError(err)
+ s.Equal([]byte{1}, blobValue.GetValues())
+
+ stringValue, err := sessionDataSet.GetString("root.tsg1.d1.s9")
+ s.NoError(err)
+ s.Equal("string", stringValue)
+ }
+ sessionDataSet.Close()
+
+ sessionDataSet, err = s.session.ExecuteQueryStatement("select * from
root.tsg1.d1 limit 1", nil)
+ for {
+ if hasNext, err := sessionDataSet.Next(); err != nil ||
!hasNext {
+ break
+ }
+ for _, columnName := range sessionDataSet.GetColumnNames() {
+ isNull, err := sessionDataSet.IsNull(columnName)
+ s.NoError(err)
+ s.False(isNull)
+ }
+ timeValue, err := sessionDataSet.GetObject("Time")
+ s.NoError(err)
+ s.Equal(time.Unix(0, 1*1e6), timeValue)
+
+ boolValue, err := sessionDataSet.GetObject("root.tsg1.d1.s0")
+ s.NoError(err)
+ s.Equal(true, boolValue)
+
+ intValue, err := sessionDataSet.GetObject("root.tsg1.d1.s1")
+ s.NoError(err)
+ s.Equal(int32(1), intValue)
+
+ longValue, err := sessionDataSet.GetObject("root.tsg1.d1.s2")
+ s.NoError(err)
+ s.Equal(int64(1), longValue)
+
+ floatValue, err := sessionDataSet.GetObject("root.tsg1.d1.s3")
+ s.NoError(err)
+ s.Equal(float32(1), floatValue)
+
+ doubleValue, err := sessionDataSet.GetObject("root.tsg1.d1.s4")
+ s.NoError(err)
+ s.Equal(float64(1), doubleValue)
+
+ textValue, err := sessionDataSet.GetObject("root.tsg1.d1.s5")
+ s.NoError(err)
+ s.Equal("text", textValue.(*client.Binary).GetStringValue())
+
+ timestampValue, err :=
sessionDataSet.GetObject("root.tsg1.d1.s6")
+ s.NoError(err)
+ s.Equal(int64(1), timestampValue)
+
+ dateValue, err := sessionDataSet.GetObject("root.tsg1.d1.s7")
+ s.NoError(err)
+ s.Equal(int32(20250326), dateValue)
+
+ blobValue, err := sessionDataSet.GetObject("root.tsg1.d1.s8")
+ s.NoError(err)
+ s.Equal([]byte{1}, blobValue.(*client.Binary).GetValues())
+
+ stringValue, err := sessionDataSet.GetObject("root.tsg1.d1.s9")
+ s.NoError(err)
+ s.Equal("string", stringValue.(*client.Binary).GetStringValue())
+ }
+}
func (s *e2eTestSuite) Test_InvalidSQL() {
_, err := s.session.ExecuteStatementWithContext(context.Background(),
"select1 from device")
assert := s.Require()