This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b323b78  [IOTDB-921] Solve the statement twice call bug (#1784)
b323b78 is described below

commit b323b78a87c8fa4942e8bc7ebf9af18bdf87a89f
Author: Qi Yu <[email protected]>
AuthorDate: Mon Oct 19 20:31:31 2020 +0800

    [IOTDB-921] Solve the statement twice call bug (#1784)
---
 .../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 64 +++++++++++++++++++++-
 .../iotdb/db/integration/IoTDBSimpleQueryIT.java   | 49 +++++++++++++++++
 .../iotdb/tsfile/utils/ReadWriteIOUtils.java       |  8 +++
 3 files changed, 119 insertions(+), 2 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java 
b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
index 60f9021..f4b7502 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java
@@ -28,12 +28,19 @@ import 
org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq;
 import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
 import org.apache.iotdb.service.rpc.thrift.TSIService;
+import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
+import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 import org.apache.thrift.TException;
+
+import java.nio.ByteBuffer;
 import java.sql.*;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class IoTDBStatement implements Statement {
 
@@ -221,6 +228,8 @@ public class IoTDBStatement implements Statement {
     } catch (StatementExecutionException e) {
       throw new IoTDBSQLException(e.getMessage(), execResp.getStatus());
     }
+
+    deepCopyResp(execResp);
     if (execResp.isSetColumns()) {
       queryId = execResp.getQueryId();
       if (execResp.queryDataSet == null) {
@@ -319,12 +328,16 @@ public class IoTDBStatement implements Statement {
     } catch (StatementExecutionException e) {
       throw new IoTDBSQLException(e.getMessage(), execResp.getStatus());
     }
+
+    //Because diffent resultSet share the same TTransport and buffer, if the 
former has not comsumed
+    //result timely, the latter will overlap the former byte buffer, thus 
problem will occur
+    deepCopyResp(execResp);
+
     if (execResp.queryDataSet == null) {
       this.resultSet = new IoTDBNonAlignJDBCResultSet(this, 
execResp.getColumns(),
           execResp.getDataTypeList(), execResp.columnNameIndexMap, 
execResp.ignoreTimeStamp, client, sql, queryId,
           sessionId, execResp.nonAlignQueryDataSet);
-    }
-    else {
+    } else {
       this.resultSet = new IoTDBJDBCResultSet(this, execResp.getColumns(),
           execResp.getDataTypeList(), execResp.columnNameIndexMap, 
execResp.ignoreTimeStamp, client, sql, queryId,
           sessionId, execResp.queryDataSet);
@@ -332,6 +345,53 @@ public class IoTDBStatement implements Statement {
     return resultSet;
   }
 
+  private void deepCopyResp(TSExecuteStatementResp queryRes) {
+    final TSQueryDataSet tsQueryDataSet = queryRes.getQueryDataSet();
+    final TSQueryNonAlignDataSet nonAlignDataSet = 
queryRes.getNonAlignQueryDataSet();
+
+    if (Objects.nonNull(tsQueryDataSet)) {
+      deepCopyTsQueryDataSet(tsQueryDataSet);
+    } else {
+      deepCopyNonAlignQueryDataSet(nonAlignDataSet);
+    }
+  }
+
+  private void deepCopyNonAlignQueryDataSet(TSQueryNonAlignDataSet 
nonAlignDataSet) {
+    if (Objects.isNull(nonAlignDataSet)) {
+      return;
+    }
+
+    final List<ByteBuffer> valueList = nonAlignDataSet.valueList
+            .stream()
+            .map(ReadWriteIOUtils::clone)
+            .collect(Collectors.toList());
+
+    final List<ByteBuffer> timeList = nonAlignDataSet.timeList
+            .stream()
+            .map(ReadWriteIOUtils::clone)
+            .collect(Collectors.toList());
+
+    nonAlignDataSet.setTimeList(timeList);
+    nonAlignDataSet.setValueList(valueList);
+  }
+
+  private void deepCopyTsQueryDataSet(TSQueryDataSet tsQueryDataSet) {
+    final ByteBuffer time = ReadWriteIOUtils.clone(tsQueryDataSet.time);
+    final List<ByteBuffer> valueList = tsQueryDataSet.valueList
+            .stream()
+            .map(ReadWriteIOUtils::clone)
+            .collect(Collectors.toList());
+
+    final List<ByteBuffer> bitmapList = tsQueryDataSet.bitmapList
+            .stream()
+            .map(ReadWriteIOUtils::clone)
+            .collect(Collectors.toList());
+
+    tsQueryDataSet.setBitmapList(bitmapList);
+    tsQueryDataSet.setValueList(valueList);
+    tsQueryDataSet.setTime(time);
+  }
+
   @Override
   public int executeUpdate(String sql) throws SQLException {
     checkConnection("execute update");
diff --git 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
index 340ed25..b75197d 100644
--- 
a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
+++ 
b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSimpleQueryIT.java
@@ -43,6 +43,12 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
 public class IoTDBSimpleQueryIT {
 
   @Before
@@ -607,4 +613,47 @@ public class IoTDBSimpleQueryIT {
       fail();
     }
   }
+
+  @Test
+  public void testUseSameStatement() throws SQLException {
+    try (Connection connection = 
DriverManager.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", 
"root", "root");
+         Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.sg1");
+      statement.execute("CREATE TIMESERIES root.sg1.d0.s0 WITH DATATYPE=INT64, 
ENCODING=RLE, COMPRESSOR=SNAPPY");
+      statement.execute("CREATE TIMESERIES root.sg1.d0.s1 WITH DATATYPE=INT64, 
ENCODING=RLE, COMPRESSOR=SNAPPY");
+      statement.execute("CREATE TIMESERIES root.sg1.d1.s0 WITH DATATYPE=INT64, 
ENCODING=RLE, COMPRESSOR=SNAPPY");
+      statement.execute("CREATE TIMESERIES root.sg1.d1.s1 WITH DATATYPE=INT64, 
ENCODING=RLE, COMPRESSOR=SNAPPY");
+
+      statement.execute("insert into root.sg1.d0(timestamp,s0,s1) 
values(1,1,1)");
+      statement.execute("insert into root.sg1.d1(timestamp,s0,s1) 
values(1000,1000,1000)");
+      statement.execute("insert into root.sg1.d0(timestamp,s0,s1) 
values(10,10,10)");
+
+
+      List<ResultSet> resultSetList = new ArrayList<>();
+
+      ResultSet r1 = statement.executeQuery("select * from root.sg1.d0 where 
time <= 1");
+      resultSetList.add(r1);
+
+      ResultSet r2 = statement.executeQuery("select * from root.sg1.d1 where 
s0 == 1000");
+      resultSetList.add(r2);
+
+      ResultSet r3 = statement.executeQuery("select * from root.sg1.d0 where 
s1 == 10");
+      resultSetList.add(r3);
+
+      r1.next();
+      Assert.assertEquals(r1.getLong(1), 1L);
+      Assert.assertEquals(r1.getLong(2), 1L);
+      Assert.assertEquals(r1.getLong(3), 1L);
+
+      r2.next();
+      Assert.assertEquals(r2.getLong(1), 1000L);
+      Assert.assertEquals(r2.getLong(2), 1000L);
+      Assert.assertEquals(r2.getLong(3), 1000L);
+
+      r3.next();
+      Assert.assertEquals(r3.getLong(1), 10L);
+      Assert.assertEquals(r3.getLong(2), 10L);
+      Assert.assertEquals(r3.getLong(3), 10L);
+    }
+  }
 }
diff --git 
a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java 
b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
index 2a25fef..c4f3a69 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/ReadWriteIOUtils.java
@@ -937,5 +937,13 @@ public class ReadWriteIOUtils {
     }
   }
 
+  public static ByteBuffer clone(ByteBuffer original) {
+    ByteBuffer clone = ByteBuffer.allocate(original.remaining());
+    while(original.hasRemaining()) {
+      clone.put(original.get());
+    }
 
+    clone.flip();
+    return clone;
+  }
 }

Reply via email to