Date: Monday, December 12, 2005 @ 14:52:33
Author: gilles
Path: /cvsroot/carob/carob
Modified: include/Connection.hpp (1.31 -> 1.32) src/Connection.cpp (1.36
-> 1.37)
Updated to protocol version 29:
. RetrieveXXXResult commands and functions
. BeginTransactionIfNeeded changes
. fetchMultipleResultsFromStream function
. acknowledgedTransactionId support
------------------------+
include/Connection.hpp | 64 ++++++++++++++
src/Connection.cpp | 204 ++++++++++++++++++++++++++++++++++-------------
2 files changed, 210 insertions(+), 58 deletions(-)
Index: carob/include/Connection.hpp
diff -u carob/include/Connection.hpp:1.31 carob/include/Connection.hpp:1.32
--- carob/include/Connection.hpp:1.31 Thu Dec 8 15:18:37 2005
+++ carob/include/Connection.hpp Mon Dec 12 14:52:33 2005
@@ -38,7 +38,7 @@
/** Command prefix sent before each command */
#define CommandPrefix 0xB015CA
-#define ProtocolVersion 27
+#define ProtocolVersion 29
//#define Ping -1
#define StatementExecuteQuery 0
@@ -47,6 +47,9 @@
//#define CallableStatementExecuteQuery 3
//#define CallableStatementExecuteUpdate 4
//#define CallableStatementExecute 5
+#define RetrieveExecuteQueryResult 10
+#define RetrieveExecuteUpdateResult 11
+#define RetrieveExecuteResult 12
#define StatementExecute 6
#define Begin 20
#define Commit 21
@@ -342,9 +345,22 @@
/** Has a write request been executed in the current transaction? */
bool writeExecutedInTransaction;
/** Transaction identifier */
- int32_t transactionId;
+ int64_t transactionId;
/** Function-specific mutexes */
CriticalSection connectionCS;
+ /**
+ * Flag to check if a new transaction must be started before executing any
+ * statement
+ */
+ bool mustBeginTransaction;
+
+ /**
+ * Begins a new transaction if needed (<code>mustBeginTransaction</code> is
+ * set to <code>true</code>).
+ */
+ void beginTransactionIfNeeded() throw (SocketIOException,
+ BackendException, ControllerException,
+ ProtocolException, UnexpectedException);
/**
* Set the autocommit mode and read-only status on this request.
@@ -423,6 +439,50 @@
DriverResultSet* receiveResultSet() throw (SocketIOException,
ProtocolException, NotImplementedException,
UnexpectedException);
+ /**
+ * Checks if the given query already executed or not on the controller we are
+ * currently connected to.
+ *
+ * @param request the request to check
+ * @return ResultSet or null
+ * @throws DriverSQLException if an error occurs
+ */
+ DriverResultSet* retrieveExecuteQueryResult(const Request &request)
+ throw (SocketIOException, BackendException,
+ ControllerException, ProtocolException,
+ UnexpectedException);
+ /**
+ * Check if the given query already executed or not on the controller we are
+ * currently connected to.
+ *
+ * @param request the request to check
+ * @return int the number of updated rows or -1 if not found
+ * @throws DriverSQLException if an error occurs
+ */
+ int32_t retrieveExecuteUpdateResult(const Request &request)
+ throw (SocketIOException, BackendException,
+ ControllerException, ProtocolException,
+ UnexpectedException);
+ /**
+ * Check if the given query already executed or not on the controller we are
+ * currently connected to.
+ *
+ * @param request the request to check
+ * @return a <code>List</code> of ResultSetOrUpdateCount
+ * @throws DriverSQLException if an error occurs
+ */
+ std::list<ResultSetOrUpdateCount> retrieveExecuteResult(const Request
&request)
+ throw (SocketIOException, BackendException,
+ ControllerException, ProtocolException,
+ UnexpectedException);
+ /**
+ * Fetches multiple results from a query executed with Statement.execute()
+ * @return the list of results
+ */
+ std::list<ResultSetOrUpdateCount> fetchMultipleResultsFromStream()
+ throw (SocketIOException, BackendException,
+ ControllerException, ProtocolException,
+ UnexpectedException);
};
} //namespace CarobNS
#endif //_CONNECTION_H_
Index: carob/src/Connection.cpp
diff -u carob/src/Connection.cpp:1.36 carob/src/Connection.cpp:1.37
--- carob/src/Connection.cpp:1.36 Thu Dec 8 15:18:37 2005
+++ carob/src/Connection.cpp Mon Dec 12 14:52:33 2005
@@ -16,7 +16,7 @@
* limitations under the License.
*
* Initial developer(s): Gilles Rayrat
- * Contributor(s):
+ * Contributor(s): Marc Herbert
*/
#include "Connection.hpp"
@@ -42,7 +42,8 @@
isClosed(true),
autoCommit(true),
readOnly(false),
- writeExecutedInTransaction(false)
+ writeExecutedInTransaction(false),
+ mustBeginTransaction(false)
{
wstring fctName(L"Connection::Connection");
isClosed = true;
@@ -200,7 +201,7 @@
{
wstring msg(fctName + L" Authentication failed. Error ");
if (!authenticatedRead)
- msg += L"while reading controller authenticated";
+ msg += L"while reading controller authenticated ack";
else if (!needsSkelRead)
msg += L"while reading need of sql skeleton";
else
@@ -234,6 +235,7 @@
if (isDebugEnabled())
logDebug(fctName, L"Resetting connection and adding it to the pool");
autoCommit = true;
+ mustBeginTransaction = false;
readOnly = false;
sendCommand(*driverSocketPtr, Reset);
//TODO: Pool connection !!!
@@ -312,11 +314,23 @@
wstring fctName(L"Connection::setAutoCommit");
connectionCS.Enter();
-
+
+ if (mustBeginTransaction) // so this.autoCommit is false
+ {
+ if (autoCommitPrm)
+ { // Just cancel the need to start a transaction
+ autoCommit = true;
+ mustBeginTransaction = false;
+ return;
+ }
+ }
+
+ // nothing to change
if (autoCommit == autoCommitPrm)
{
return;
}
+ // autocommit false -> true
if (autoCommitPrm)
{
//TODO: Should we try/catch ?
@@ -330,11 +344,15 @@
autoCommit = true;
return;
}
- else
+ else // autocommit true -> false
{
- sendCommand(*driverSocketPtr, Begin);
+
+ mustBeginTransaction = true;
+
+ // send the BEGIN right now (unlike pgjdbc & others)
+ beginTransactionIfNeeded();
+
- transactionId = receiveLongOrException();
autoCommit = false;
if (isDebugEnabled())
@@ -345,6 +363,17 @@
connectionCS.Leave();
}
+void Connection::beginTransactionIfNeeded() throw (SocketIOException,
+BackendException, ControllerException, ProtocolException, UnexpectedException)
+{
+ if (mustBeginTransaction)
+ {
+ sendCommand(*driverSocketPtr, Begin);
+ transactionId = receiveLongOrException();
+ mustBeginTransaction = false;
+ }
+}
+
void Connection::setReadOnly(bool readOnlyPrm) throw (SocketIOException,
DriverException, UnexpectedException)
{
@@ -368,36 +397,31 @@
connectionCS.Leave();
throw DriverException(L"Trying to commit a connection in autocommit mode");
}
- int64_t firstTransactionId = transactionId;
try
{
sendCommand(*driverSocketPtr, Commit);
-
- // Commit is followed by a BEGIN
- transactionId = receiveLongOrException();
- writeExecutedInTransaction = false;
- if (isDebugEnabled())
+ int64_t acknowledgedTransactionId = receiveLongOrException();
+ if (acknowledgedTransactionId != transactionId)
{
- logDebug(fctName,L"New transaction " + toWString(transactionId)
- + L" has been started");
+ throw (DriverException(L"Protocol error during commit (acknowledge
transaction ID = "
+ + toWString(acknowledgedTransactionId) + L", expected
transaction ID = "
+ + toWString(transactionId) + L")"));
}
+ // Commit must be followed by a BEGIN
+ mustBeginTransaction = true;
}
catch (SocketIOException e)
{
if (isWarningEnabled())
{
logWarning(fctName, L"I/O Error occured around commit of transaction '"
- + toWString(firstTransactionId) + L"\n" + e.description());
+ + toWString(transactionId) + L"\n" + e.description());
}
- autoCommit = true;
- transactionId = 0;
connectionCS.Leave();
throw;
}
catch (...)
{
- autoCommit = true;
- transactionId = 0;
connectionCS.Leave();
throw;
}
@@ -417,36 +441,31 @@
throw DriverException(L"Trying to rollback a connection in autocommit
mode");
}
- int64_t initialTransactionId = transactionId;
try
{
sendCommand(*driverSocketPtr, Rollback);
+ int64_t acknowledgedTransactionId = receiveLongOrException();
+ if (acknowledgedTransactionId != transactionId)
+ {
+ throw (DriverException(L"Protocol error during rollback (acknowledge
transaction ID = "
+ + toWString(acknowledgedTransactionId) + L", expected
transaction ID = "
+ + toWString(transactionId) + L")"));
+ }
// Rollback is followed by a BEGIN
- transactionId = receiveLongOrException();
- writeExecutedInTransaction = false;
-
- if (isDebugEnabled())
- logDebug(fctName, L"Transaction " + toWString(transactionId)
- + L" has been started");
+ mustBeginTransaction = true;
}
catch (SocketIOException e)
{
if (isWarningEnabled())
{
logWarning(fctName, L"I/O Error occured around rollback of transaction '"
- + toWString(initialTransactionId) + L"\n" + e.description());
+ + toWString(transactionId) + L"\n" + e.description());
}
- // Transaction has been aborted by the controller
- autoCommit = true;
- transactionId = 0;
connectionCS.Leave();
- throw DriverException(L"I/O Error occured around rollback of transaction '"
- + toWString(initialTransactionId)+ e.description());
+ throw;
}
catch (...)
{
- autoCommit = true;
- transactionId = 0;
connectionCS.Leave();
throw;
}
@@ -480,12 +499,13 @@
{
wstring fctName(L"Connection::statementExecuteQuery");
-
+
connectionCS.Enter();
DriverResultSet* retVal = NULL;
checkIfConnected();
+ beginTransactionIfNeeded();
try
{
@@ -532,12 +552,17 @@
int32_t controllerResponse = -1;
checkIfConnected();
+ beginTransactionIfNeeded();
+
+ bool requestIdIsSet = false;
try
{
setConnectionParametersOnRequest(request);
sendCommand(*driverSocketPtr, StatementExecuteUpdate);
writeRequestOnStream(request);
+ request.setId(receiveLongOrException());
+ requestIdIsSet = true;
controllerResponse = receiveIntOrException();
}
catch (SocketIOException sioe)
@@ -547,6 +572,18 @@
+ L"). Connection probably lost";
if (isErrorEnabled())
logError(fctName, msg);
+
+ //TODO:
+ //reconnect()
+// if (requestIdIsSet)
+// { // Controller handled the query, check if it was executed
+// int result = retrieveExecuteUpdateResult(request);
+// if (result != -1)
+// return result;
+// }
+ // At this point the query failed before any controller succeeded in
+ // executing the query
+
connectionCS.Leave();
throw SocketIOException(msg);
//in case exception is not catched
@@ -572,34 +609,22 @@
std::list<ResultSetOrUpdateCount> results;
checkIfConnected();
+ beginTransactionIfNeeded();
+
+ bool requestIdIsSet = false;
try
{
setConnectionParametersOnRequest(request);
+ if (!autoCommit)
+ writeExecutedInTransaction = true;
sendCommand(*driverSocketPtr, StatementExecute);
request.sendToStream(*driverSocketPtr, controllerNeedsSqlSkeleton);
if (isDebugEnabled())
logDebug(fctName, L"Executing Statement.execute(" + (wstring)request +
L")");
-
- bool hasResult = false;
- ResultSetOrUpdateCount resultReceived;
- do
- {
- hasResult = receiveBoolOrException();
- if (hasResult)
- {
- resultReceived.isResultSet = true;
- resultReceived.resultSetPtr = receiveResultSet();
- }
- else
- {
- resultReceived.isResultSet = false;
- resultReceived.updateCount = receiveIntOrException();
- }
- results.push_back(resultReceived);
-
- }
- while (hasResult || resultReceived.updateCount != -1);
+ request.setId(receiveLongOrException());
+ requestIdIsSet = true;
+ results = fetchMultipleResultsFromStream();
}
catch (SocketIOException sioe)
{
@@ -608,6 +633,19 @@
+ L"). Connection probably lost";
if (isErrorEnabled())
logError(fctName, msg);
+ //TODO:
+// reconnect();
+// if (requestIdIsSet)
+// { // Controller handled the query, check if it was executed
+// List result = retrieveExecuteResult(request);
+// if (result != null)
+// return result;
+// }
+// // At this point the query failed before any controller succeeded in
+// // executing the query
+//
+// return statementExecute(request);
+
connectionCS.Leave();
throw SocketIOException(msg);
//in case exception is not catched
@@ -814,3 +852,57 @@
//just to avoid compiler warnings
return NULL;
}
+
+DriverResultSet* Connection::retrieveExecuteQueryResult(const Request &request)
+ throw (SocketIOException, BackendException, ControllerException,
+ ProtocolException, UnexpectedException)
+{
+ sendCommand(*driverSocketPtr, RetrieveExecuteQueryResult);
+ request.sendToStream(*driverSocketPtr, controllerNeedsSqlSkeleton);
+ return receiveResultSet();
+}
+
+int32_t Connection::retrieveExecuteUpdateResult(const Request &request)
+ throw (SocketIOException, BackendException, ControllerException,
+ ProtocolException, UnexpectedException)
+{
+ sendCommand(*driverSocketPtr, RetrieveExecuteUpdateResult);
+ request.sendToStream(*driverSocketPtr, controllerNeedsSqlSkeleton);
+ return receiveIntOrException();
+}
+std::list<ResultSetOrUpdateCount> Connection::retrieveExecuteResult(const
Request &request)
+ throw (SocketIOException, BackendException, ControllerException,
+ ProtocolException, UnexpectedException)
+{
+ sendCommand(*driverSocketPtr, RetrieveExecuteResult);
+ request.sendToStream(*driverSocketPtr, controllerNeedsSqlSkeleton);
+ return fetchMultipleResultsFromStream();
+}
+
+std::list<ResultSetOrUpdateCount> Connection::fetchMultipleResultsFromStream()
+ throw (SocketIOException, BackendException,
+ ControllerException, ProtocolException,
+ UnexpectedException)
+{
+ std::list<ResultSetOrUpdateCount> results;
+ bool hasResult = false;
+ ResultSetOrUpdateCount resultReceived;
+ do
+ {
+ hasResult = receiveBoolOrException();
+ if (hasResult)
+ {
+ resultReceived.isResultSet = true;
+ resultReceived.resultSetPtr = receiveResultSet();
+ }
+ else
+ {
+ resultReceived.isResultSet = false;
+ resultReceived.updateCount = receiveIntOrException();
+ }
+ results.push_back(resultReceived);
+
+ }
+ while (hasResult || resultReceived.updateCount != -1);
+ return results;
+}
_______________________________________________
Carob-commits mailing list
[email protected]
https://forge.continuent.org/mailman/listinfo/carob-commits