Date: Monday, December 12, 2005 @ 14:52:33
  Author: gilles
    Path: /cvsroot/carob/carob

Modified: include/Connection.hpp (1.31 -> 1.32) src/Connection.cpp (1.36
          -> 1.37)

Updated to protocol version 29:
. RetrieveXXXResult commands and functions
. BeginTransactionIfNeeded changes
. fetchMultipleResultsFromStream function
. acknowledgedTransactionId support


------------------------+
 include/Connection.hpp |   64 ++++++++++++++
 src/Connection.cpp     |  204 ++++++++++++++++++++++++++++++++++-------------
 2 files changed, 210 insertions(+), 58 deletions(-)


Index: carob/include/Connection.hpp
diff -u carob/include/Connection.hpp:1.31 carob/include/Connection.hpp:1.32
--- carob/include/Connection.hpp:1.31   Thu Dec  8 15:18:37 2005
+++ carob/include/Connection.hpp        Mon Dec 12 14:52:33 2005
@@ -38,7 +38,7 @@
 /** Command prefix sent before each command */
 #define CommandPrefix                           0xB015CA
 
-#define ProtocolVersion                         27
+#define ProtocolVersion                         29
 
 //#define Ping                                    -1
 #define StatementExecuteQuery                   0
@@ -47,6 +47,9 @@
 //#define CallableStatementExecuteQuery           3
 //#define CallableStatementExecuteUpdate          4
 //#define CallableStatementExecute                5
+#define RetrieveExecuteQueryResult              10
+#define RetrieveExecuteUpdateResult             11
+#define RetrieveExecuteResult                   12
 #define StatementExecute                        6
 #define Begin                                   20
 #define Commit                                  21
@@ -342,9 +345,22 @@
   /** Has a write request been executed in the current transaction? */
   bool                writeExecutedInTransaction;
   /** Transaction identifier */
-  int32_t             transactionId;
+  int64_t             transactionId;
   /** Function-specific mutexes */
   CriticalSection     connectionCS;
+  /**
+   * Flag to check if a new transaction must be started before executing any
+   * statement
+   */
+  bool                mustBeginTransaction;
+
+  /**
+   * Begins a new transaction if needed (<code>mustBeginTransaction</code> is
+   * set to <code>true</code>).
+   */
+  void                beginTransactionIfNeeded() throw (SocketIOException,
+                          BackendException, ControllerException, 
+                          ProtocolException, UnexpectedException);
 
   /**
    * Set the autocommit mode and read-only status on this request.
@@ -423,6 +439,50 @@
   DriverResultSet*    receiveResultSet() throw (SocketIOException,
                           ProtocolException, NotImplementedException,
                           UnexpectedException);
+  /**
+   * Checks if the given query already executed or not on the controller we are
+   * currently connected to.
+   *
+   * @param request the request to check
+   * @return ResultSet or null
+   * @throws DriverSQLException if an error occurs
+   */
+  DriverResultSet*    retrieveExecuteQueryResult(const Request &request)
+                          throw (SocketIOException, BackendException,
+                          ControllerException, ProtocolException,
+                          UnexpectedException);
+  /**
+   * Check if the given query already executed or not on the controller we are
+   * currently connected to.
+   * 
+   * @param request the request to check
+   * @return int the number of updated rows or -1 if not found
+   * @throws DriverSQLException if an error occurs
+   */
+  int32_t             retrieveExecuteUpdateResult(const Request &request)
+                          throw (SocketIOException, BackendException,
+                          ControllerException, ProtocolException,
+                          UnexpectedException);
+  /**
+   * Check if the given query already executed or not on the controller we are
+   * currently connected to.
+   * 
+   * @param request the request to check
+   * @return a <code>List</code> of ResultSetOrUpdateCount
+   * @throws DriverSQLException if an error occurs
+   */
+  std::list<ResultSetOrUpdateCount> retrieveExecuteResult(const Request 
&request)
+                          throw (SocketIOException, BackendException,
+                          ControllerException, ProtocolException,
+                          UnexpectedException);
+  /**
+   * Fetches multiple results from a query executed with Statement.execute()
+   * @return the list of results
+   */
+  std::list<ResultSetOrUpdateCount> fetchMultipleResultsFromStream()
+                          throw (SocketIOException, BackendException,
+                          ControllerException, ProtocolException,
+                          UnexpectedException);
 };
 } //namespace CarobNS
 #endif //_CONNECTION_H_
Index: carob/src/Connection.cpp
diff -u carob/src/Connection.cpp:1.36 carob/src/Connection.cpp:1.37
--- carob/src/Connection.cpp:1.36       Thu Dec  8 15:18:37 2005
+++ carob/src/Connection.cpp    Mon Dec 12 14:52:33 2005
@@ -16,7 +16,7 @@
  * limitations under the License.
  *
  * Initial developer(s): Gilles Rayrat
- * Contributor(s): 
+ * Contributor(s): Marc Herbert
  */
 
 #include "Connection.hpp"
@@ -42,7 +42,8 @@
     isClosed(true),
     autoCommit(true),
     readOnly(false),
-    writeExecutedInTransaction(false)
+    writeExecutedInTransaction(false),
+    mustBeginTransaction(false)
 {
   wstring fctName(L"Connection::Connection");
   isClosed = true;
@@ -200,7 +201,7 @@
   {
     wstring msg(fctName + L" Authentication failed. Error ");
     if (!authenticatedRead)
-      msg += L"while reading controller authenticated";
+      msg += L"while reading controller authenticated ack";
     else if (!needsSkelRead)
       msg += L"while reading need of sql skeleton";
     else
@@ -234,6 +235,7 @@
       if (isDebugEnabled())
         logDebug(fctName, L"Resetting connection and adding it to the pool");
       autoCommit = true;
+      mustBeginTransaction = false;
       readOnly = false;
       sendCommand(*driverSocketPtr, Reset);
       //TODO: Pool connection !!!
@@ -312,11 +314,23 @@
   wstring fctName(L"Connection::setAutoCommit");
 
   connectionCS.Enter();
-
+  
+  if (mustBeginTransaction) // so this.autoCommit is false
+  {
+    if (autoCommitPrm)
+    { // Just cancel the need to start a transaction
+      autoCommit = true;
+      mustBeginTransaction = false;
+      return;
+    }
+  }
+    
+  // nothing to change
   if (autoCommit == autoCommitPrm)
   {
     return;
   }
+  // autocommit false -> true
   if (autoCommitPrm)
   {
     //TODO: Should we try/catch ?
@@ -330,11 +344,15 @@
     autoCommit = true;
     return;
   }
-  else
+  else // autocommit true -> false
   {
-    sendCommand(*driverSocketPtr, Begin);
+    
+    mustBeginTransaction = true;
+
+    // send the BEGIN right now (unlike pgjdbc & others)
+    beginTransactionIfNeeded();
+    
 
-    transactionId = receiveLongOrException();
     autoCommit = false;
 
     if (isDebugEnabled())
@@ -345,6 +363,17 @@
   connectionCS.Leave();
 }
 
+void Connection::beginTransactionIfNeeded() throw (SocketIOException,
+BackendException, ControllerException, ProtocolException, UnexpectedException)
+{
+  if (mustBeginTransaction)
+  {
+    sendCommand(*driverSocketPtr, Begin);
+    transactionId = receiveLongOrException();
+    mustBeginTransaction = false;
+  }
+}
+
 void Connection::setReadOnly(bool readOnlyPrm) throw (SocketIOException,
 DriverException, UnexpectedException)
 {
@@ -368,36 +397,31 @@
     connectionCS.Leave();
     throw DriverException(L"Trying to commit a connection in autocommit mode");
   }
-  int64_t firstTransactionId = transactionId;
   try
   {
     sendCommand(*driverSocketPtr, Commit);
-
-    // Commit is followed by a BEGIN
-    transactionId = receiveLongOrException();
-    writeExecutedInTransaction = false;
-    if (isDebugEnabled())
+    int64_t acknowledgedTransactionId = receiveLongOrException();
+    if (acknowledgedTransactionId != transactionId)
     {
-      logDebug(fctName,L"New transaction " + toWString(transactionId)
-          + L" has been started");
+      throw (DriverException(L"Protocol error during commit (acknowledge 
transaction ID = "
+              + toWString(acknowledgedTransactionId) + L", expected 
transaction ID = "
+              + toWString(transactionId) + L")"));
     }
+    // Commit must be followed by a BEGIN
+    mustBeginTransaction = true;
   }
   catch (SocketIOException e)
   {
     if (isWarningEnabled())
     {
       logWarning(fctName, L"I/O Error occured around commit of transaction '"
-          + toWString(firstTransactionId) + L"\n" + e.description());
+          + toWString(transactionId) + L"\n" + e.description());
     }
-    autoCommit = true;
-    transactionId = 0;
     connectionCS.Leave();
     throw;
   }
   catch (...)
   {
-    autoCommit = true;
-    transactionId = 0;
     connectionCS.Leave();
     throw;
   }
@@ -417,36 +441,31 @@
     throw DriverException(L"Trying to rollback a connection in autocommit 
mode");
   }
 
-  int64_t initialTransactionId = transactionId;
   try
   {
     sendCommand(*driverSocketPtr, Rollback);
+    int64_t acknowledgedTransactionId = receiveLongOrException();
+    if (acknowledgedTransactionId != transactionId)
+    {
+      throw (DriverException(L"Protocol error during rollback (acknowledge 
transaction ID = "
+              + toWString(acknowledgedTransactionId) + L", expected 
transaction ID = "
+              + toWString(transactionId) + L")"));
+    }
     // Rollback is followed by a BEGIN
-    transactionId = receiveLongOrException();
-    writeExecutedInTransaction = false;
-
-    if (isDebugEnabled())
-      logDebug(fctName, L"Transaction " + toWString(transactionId)
-          + L" has been started");
+    mustBeginTransaction = true;
   }
   catch (SocketIOException e)
   {
     if (isWarningEnabled())
     {
       logWarning(fctName, L"I/O Error occured around rollback of transaction '"
-          + toWString(initialTransactionId) + L"\n" + e.description());
+          + toWString(transactionId) + L"\n" + e.description());
     }
-    // Transaction has been aborted by the controller
-    autoCommit = true;
-    transactionId = 0;
     connectionCS.Leave();
-    throw DriverException(L"I/O Error occured around rollback of transaction '"
-                          + toWString(initialTransactionId)+ e.description());
+    throw;
   }
   catch (...)
   {
-    autoCommit = true;
-    transactionId = 0;
     connectionCS.Leave();
     throw;
   }
@@ -480,12 +499,13 @@
 {
   
   wstring fctName(L"Connection::statementExecuteQuery");
-  
+
   connectionCS.Enter();
 
   DriverResultSet* retVal = NULL;
   
   checkIfConnected();
+  beginTransactionIfNeeded();
 
   try
   {
@@ -532,12 +552,17 @@
   int32_t controllerResponse = -1;
   
   checkIfConnected();
+  beginTransactionIfNeeded();
+
+  bool requestIdIsSet = false;
 
   try
   {
     setConnectionParametersOnRequest(request);
     sendCommand(*driverSocketPtr, StatementExecuteUpdate);
     writeRequestOnStream(request);
+    request.setId(receiveLongOrException());
+    requestIdIsSet = true;
     controllerResponse = receiveIntOrException();
   }
   catch (SocketIOException sioe)
@@ -547,6 +572,18 @@
                   + L"). Connection probably lost";
     if (isErrorEnabled())
       logError(fctName, msg);
+
+    //TODO:
+    //reconnect()
+//    if (requestIdIsSet)
+//    { // Controller handled the query, check if it was executed
+//      int result = retrieveExecuteUpdateResult(request);
+//      if (result != -1)
+//        return result;
+//    }
+    // At this point the query failed before any controller succeeded in
+    // executing the query
+
     connectionCS.Leave();
     throw SocketIOException(msg);
     //in case exception is not catched
@@ -572,34 +609,22 @@
   std::list<ResultSetOrUpdateCount> results;
 
   checkIfConnected();
+  beginTransactionIfNeeded();
+
+  bool requestIdIsSet = false;
 
   try
   {
     setConnectionParametersOnRequest(request);
+    if (!autoCommit)
+      writeExecutedInTransaction = true;
     sendCommand(*driverSocketPtr, StatementExecute);
     request.sendToStream(*driverSocketPtr, controllerNeedsSqlSkeleton);
     if (isDebugEnabled())
       logDebug(fctName, L"Executing Statement.execute(" + (wstring)request + 
L")");
-
-    bool hasResult = false;
-    ResultSetOrUpdateCount resultReceived;
-    do
-    {
-      hasResult = receiveBoolOrException();
-      if (hasResult)
-      {
-        resultReceived.isResultSet = true;
-        resultReceived.resultSetPtr = receiveResultSet();
-      }
-      else
-      {
-        resultReceived.isResultSet = false;
-        resultReceived.updateCount = receiveIntOrException();
-      }
-      results.push_back(resultReceived);
-      
-    }
-    while (hasResult || resultReceived.updateCount != -1);
+    request.setId(receiveLongOrException());
+    requestIdIsSet = true;
+    results = fetchMultipleResultsFromStream();
   }
   catch (SocketIOException sioe)
   {
@@ -608,6 +633,19 @@
                   + L"). Connection probably lost";
     if (isErrorEnabled())
       logError(fctName, msg);
+    //TODO:
+//    reconnect();
+//    if (requestIdIsSet)
+//    { // Controller handled the query, check if it was executed
+//      List result = retrieveExecuteResult(request);
+//      if (result != null)
+//        return result;
+//    }
+//    // At this point the query failed before any controller succeeded in
+//    // executing the query
+//
+//    return statementExecute(request);
+  
     connectionCS.Leave();
     throw SocketIOException(msg);
     //in case exception is not catched
@@ -814,3 +852,57 @@
   //just to avoid compiler warnings
   return NULL;
 }
+
+DriverResultSet* Connection::retrieveExecuteQueryResult(const Request &request)
+    throw (SocketIOException, BackendException, ControllerException,
+    ProtocolException, UnexpectedException)
+{
+  sendCommand(*driverSocketPtr, RetrieveExecuteQueryResult);
+  request.sendToStream(*driverSocketPtr, controllerNeedsSqlSkeleton);
+  return receiveResultSet();
+}
+
+int32_t Connection::retrieveExecuteUpdateResult(const Request &request)
+    throw (SocketIOException, BackendException, ControllerException,
+    ProtocolException, UnexpectedException)
+{
+  sendCommand(*driverSocketPtr, RetrieveExecuteUpdateResult);
+  request.sendToStream(*driverSocketPtr, controllerNeedsSqlSkeleton);
+  return receiveIntOrException();
+}
+std::list<ResultSetOrUpdateCount> Connection::retrieveExecuteResult(const 
Request &request)
+    throw (SocketIOException, BackendException, ControllerException,
+    ProtocolException, UnexpectedException)
+{
+  sendCommand(*driverSocketPtr, RetrieveExecuteResult);
+  request.sendToStream(*driverSocketPtr, controllerNeedsSqlSkeleton);
+  return fetchMultipleResultsFromStream();
+}
+
+std::list<ResultSetOrUpdateCount> Connection::fetchMultipleResultsFromStream()
+                        throw (SocketIOException, BackendException,
+                        ControllerException, ProtocolException,
+                        UnexpectedException)
+{
+  std::list<ResultSetOrUpdateCount> results;
+  bool hasResult = false;
+  ResultSetOrUpdateCount resultReceived;
+  do
+  {
+    hasResult = receiveBoolOrException();
+    if (hasResult)
+    {
+      resultReceived.isResultSet = true;
+      resultReceived.resultSetPtr = receiveResultSet();
+    }
+    else
+    {
+      resultReceived.isResultSet = false;
+      resultReceived.updateCount = receiveIntOrException();
+    }
+    results.push_back(resultReceived);
+    
+  }
+  while (hasResult || resultReceived.updateCount != -1);
+  return results;
+}

_______________________________________________
Carob-commits mailing list
[email protected]
https://forge.continuent.org/mailman/listinfo/carob-commits

Reply via email to