Author: rhs
Date: Wed Oct 3 18:16:40 2012
New Revision: 1393645
URL: http://svn.apache.org/viewvc?rev=1393645&view=rev
Log:
added codec support for timestamps and use it to provide the correct type for
timestamp fields in message headers
Modified:
qpid/proton/trunk/proton-c/bindings/python/proton.py
qpid/proton/trunk/proton-c/include/proton/codec.h
qpid/proton/trunk/proton-c/src/codec/codec.c
qpid/proton/trunk/proton-c/src/message/message.c
qpid/proton/trunk/tests/proton_tests/codec.py
Modified: qpid/proton/trunk/proton-c/bindings/python/proton.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/bindings/python/proton.py?rev=1393645&r1=1393644&r2=1393645&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/bindings/python/proton.py (original)
+++ qpid/proton/trunk/proton-c/bindings/python/proton.py Wed Oct 3 18:16:40
2012
@@ -663,6 +663,7 @@ class Data:
INT = PN_INT; "A signed int value."
ULONG = PN_ULONG; "An unsigned long value."
LONG = PN_LONG; "A signed long value."
+ TIMESTAMP = PN_TIMESTAMP; "A timestamp value."
FLOAT = PN_FLOAT; "A float value."
DOUBLE = PN_DOUBLE; "A double value."
BINARY = PN_BINARY; "A binary string."
@@ -910,6 +911,14 @@ class Data:
"""
self._check(pn_data_put_long(self._data, l))
+ def put_timestamp(self, t):
+ """
+ Puts a timestamp value.
+
+ @param t: an integral value
+ """
+ self._check(pn_data_put_timestamp(self._data, t))
+
def put_float(self, f):
"""
Puts a float value.
@@ -1117,6 +1126,15 @@ class Data:
self._check(err)
return value
+ def get_timestamp(self):
+ """
+ If the current node is a timestamp, returns its value, raises
+ an exception otherwise.
+ """
+ err, value = pn_data_get_timestamp(self._data)
+ self._check(err)
+ return value
+
def get_float(self):
"""
If the current node is a float, returns its value, raises an
Modified: qpid/proton/trunk/proton-c/include/proton/codec.h
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/include/proton/codec.h?rev=1393645&r1=1393644&r2=1393645&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/include/proton/codec.h (original)
+++ qpid/proton/trunk/proton-c/include/proton/codec.h Wed Oct 3 18:16:40 2012
@@ -43,6 +43,7 @@ typedef enum {
PN_INT,
PN_ULONG,
PN_LONG,
+ PN_TIMESTAMP,
PN_FLOAT,
PN_DOUBLE,
PN_BINARY,
@@ -67,6 +68,7 @@ typedef struct {
int32_t as_int;
uint64_t as_ulong;
int64_t as_long;
+ pn_timestamp_t as_timestamp;
float as_float;
double as_double;
pn_bytes_t as_binary;
@@ -117,6 +119,7 @@ int pn_data_put_uint(pn_data_t *data, ui
int pn_data_put_int(pn_data_t *data, int32_t i);
int pn_data_put_ulong(pn_data_t *data, uint64_t ul);
int pn_data_put_long(pn_data_t *data, int64_t l);
+int pn_data_put_timestamp(pn_data_t *data, pn_timestamp_t t);
int pn_data_put_float(pn_data_t *data, float f);
int pn_data_put_double(pn_data_t *data, double d);
int pn_data_put_binary(pn_data_t *data, pn_bytes_t bytes);
@@ -137,6 +140,7 @@ int pn_data_get_uint(pn_data_t *data, ui
int pn_data_get_int(pn_data_t *data, int32_t *i);
int pn_data_get_ulong(pn_data_t *data, uint64_t *ul);
int pn_data_get_long(pn_data_t *data, int64_t *l);
+int pn_data_get_timestamp(pn_data_t *data, pn_timestamp_t *l);
int pn_data_get_float(pn_data_t *data, float *f);
int pn_data_get_double(pn_data_t *data, double *d);
int pn_data_get_binary(pn_data_t *data, pn_bytes_t *bytes);
Modified: qpid/proton/trunk/proton-c/src/codec/codec.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/codec/codec.c?rev=1393645&r1=1393644&r2=1393645&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/codec/codec.c (original)
+++ qpid/proton/trunk/proton-c/src/codec/codec.c Wed Oct 3 18:16:40 2012
@@ -79,6 +79,7 @@ const char *pn_type_str(pn_type_t type)
case PN_INT: return "PN_INT";
case PN_ULONG: return "PN_ULONG";
case PN_LONG: return "PN_LONG";
+ case PN_TIMESTAMP: return "PN_TIMESTAMP";
case PN_FLOAT: return "PN_FLOAT";
case PN_DOUBLE: return "PN_DOUBLE";
case PN_BINARY: return "PN_BINARY";
@@ -157,6 +158,8 @@ int pn_format_atom(pn_bytes_t *bytes, pn
return pn_bytes_format(bytes, "%" PRIu64, atom.u.as_ulong);
case PN_LONG:
return pn_bytes_format(bytes, "%" PRIi64, atom.u.as_long);
+ case PN_TIMESTAMP:
+ return pn_bytes_format(bytes, "%" PRIi64, atom.u.as_timestamp);
case PN_FLOAT:
return pn_bytes_format(bytes, "%g", atom.u.as_float);
case PN_DOUBLE:
@@ -403,6 +406,7 @@ uint8_t pn_type2code(pn_type_t type)
case PN_INT: return PNE_INT;
case PN_FLOAT: return PNE_FLOAT;
case PN_LONG: return PNE_LONG;
+ case PN_TIMESTAMP: return PNE_MS64;
case PN_DOUBLE: return PNE_DOUBLE;
case PN_ULONG: return PNE_ULONG;
case PN_BINARY: return PNE_VBIN32;
@@ -530,6 +534,7 @@ int pn_encode_value(pn_bytes_t *bytes, p
case PN_INT: return pn_bytes_writef32(bytes, atom->u.as_int);
case PN_ULONG: return pn_bytes_writef64(bytes, atom->u.as_ulong);
case PN_LONG: return pn_bytes_writef64(bytes, atom->u.as_long);
+ case PN_TIMESTAMP: return pn_bytes_writef64(bytes, atom->u.as_timestamp);
case PN_FLOAT: c.f = atom->u.as_float; return pn_bytes_writef32(bytes, c.i);
case PN_DOUBLE: c.d = atom->u.as_double; return pn_bytes_writef64(bytes,
c.l);
case PN_BINARY: return pn_bytes_writev32(bytes, &atom->u.as_binary);
@@ -633,6 +638,7 @@ pn_type_t pn_code2type(uint8_t code)
return PN_SHORT;
case PNE_UINT0:
case PNE_SMALLUINT:
+ case PNE_SMALLINT:
case PNE_UINT:
return PN_UINT;
case PNE_INT:
@@ -641,10 +647,13 @@ pn_type_t pn_code2type(uint8_t code)
return PN_FLOAT;
case PNE_LONG:
return PN_LONG;
+ case PNE_MS64:
+ return PN_TIMESTAMP;
case PNE_DOUBLE:
return PN_DOUBLE;
case PNE_ULONG0:
case PNE_SMALLULONG:
+ case PNE_SMALLLONG:
case PNE_ULONG:
return PN_ULONG;
case PNE_VBIN8:
@@ -733,6 +742,11 @@ int pn_decode_value(pn_bytes_t *bytes, p
atom.type=PN_UINT, atom.u.as_uint=*((uint8_t *) (bytes->start));
pn_bytes_ltrim(bytes, 1);
break;
+ case PNE_SMALLINT:
+ if (!bytes->size) return PN_UNDERFLOW;
+ atom.type=PN_INT, atom.u.as_uint=*((int8_t *) (bytes->start));
+ pn_bytes_ltrim(bytes, 1);
+ break;
case PNE_INT:
if (bytes->size < 4) return PN_UNDERFLOW;
atom.type=PN_INT, atom.u.as_int=ntohl(*((uint32_t *) (bytes->start)));
@@ -747,6 +761,7 @@ int pn_decode_value(pn_bytes_t *bytes, p
break;
case PNE_ULONG:
case PNE_LONG:
+ case PNE_MS64:
case PNE_DOUBLE:
if (bytes->size < 8) return PN_UNDERFLOW;
@@ -764,6 +779,9 @@ int pn_decode_value(pn_bytes_t *bytes, p
case PNE_LONG:
atom.type=PN_LONG, atom.u.as_long=(int64_t) conv.l;
break;
+ case PNE_MS64:
+ atom.type=PN_TIMESTAMP, atom.u.as_timestamp=(pn_timestamp_t) conv.l;
+ break;
case PNE_DOUBLE:
// XXX: this assumes the platform uses IEEE floats
atom.type=PN_DOUBLE, atom.u.as_double=conv.d;
@@ -782,6 +800,11 @@ int pn_decode_value(pn_bytes_t *bytes, p
atom.type=PN_ULONG, atom.u.as_ulong=*((uint8_t *) (bytes->start));
pn_bytes_ltrim(bytes, 1);
break;
+ case PNE_SMALLLONG:
+ if (!bytes->size) return PN_UNDERFLOW;
+ atom.type=PN_LONG, atom.u.as_long=*((int8_t *) (bytes->start));
+ pn_bytes_ltrim(bytes, 1);
+ break;
case PNE_VBIN8:
case PNE_STR8_UTF8:
case PNE_SYM8:
@@ -993,6 +1016,11 @@ int pn_vfill_one(pn_atoms_t *atoms, cons
atom->u.as_long = va_arg(*ap, int64_t);
(*nvals)++;
return 0;
+ case 't':
+ atom->type = PN_TIMESTAMP;
+ atom->u.as_timestamp = va_arg(*ap, pn_timestamp_t);
+ (*nvals)++;
+ return 0;
case 'f':
atom->type = PN_FLOAT;
atom->u.as_float = va_arg(*ap, double);
@@ -1387,6 +1415,19 @@ int pn_scan_one(pn_atoms_t *atoms, const
}
if (atoms) pn_atoms_ltrim(atoms, 1);
return 0;
+ case 't':
+ {
+ pn_timestamp_t *value = va_arg(*ap, pn_timestamp_t *);
+ if (atom && atom->type == PN_TIMESTAMP) {
+ *value = atom->u.as_timestamp;
+ *scanned = true;
+ } else {
+ *value = 0;
+ *scanned = false;
+ }
+ }
+ if (atoms) pn_atoms_ltrim(atoms, 1);
+ return 0;
case 'f':
{
float *value = va_arg(*ap, float *);
@@ -1821,6 +1862,9 @@ int pn_data_vfill(pn_data_t *data, const
case 'l':
err = pn_data_put_long(data, va_arg(ap, int64_t));
break;
+ case 't':
+ err = pn_data_put_timestamp(data, va_arg(ap, pn_timestamp_t));
+ break;
case 'f':
err = pn_data_put_float(data, va_arg(ap, double));
break;
@@ -2141,6 +2185,20 @@ int pn_data_vscan(pn_data_t *data, const
}
if (resume_count && level == count_level) resume_count--;
break;
+ case 't':
+ {
+ pn_timestamp_t *value = va_arg(ap, pn_timestamp_t *);
+ found = pn_scan_next(data, &type, suspend);
+ if (found && type == PN_TIMESTAMP) {
+ pn_data_get_timestamp(data, value);
+ scanned = true;
+ } else {
+ *value = 0;
+ scanned = false;
+ }
+ }
+ if (resume_count && level == count_level) resume_count--;
+ break;
case 'f':
{
float *value = va_arg(ap, float *);
@@ -2637,6 +2695,10 @@ int pn_data_parse_atoms(pn_data_t *data,
pn_data_put_long(data, atom.u.as_long);
count++;
break;
+ case PN_TIMESTAMP:
+ pn_data_put_timestamp(data, atom.u.as_timestamp);
+ count++;
+ break;
case PN_FLOAT:
pn_data_put_float(data, atom.u.as_float);
count++;
@@ -2873,6 +2935,14 @@ int pn_data_put_long(pn_data_t *data, in
return 0;
}
+int pn_data_put_timestamp(pn_data_t *data, pn_timestamp_t t)
+{
+ pn_node_t *node = pn_data_add(data);
+ node->atom.type = PN_TIMESTAMP;
+ node->atom.u.as_timestamp = t;
+ return 0;
+}
+
int pn_data_put_float(pn_data_t *data, float f)
{
pn_node_t *node = pn_data_add(data);
@@ -3085,6 +3155,18 @@ int pn_data_get_long(pn_data_t *data, in
}
}
+int pn_data_get_timestamp(pn_data_t *data, pn_timestamp_t *t)
+{
+ pn_node_t *node = pn_data_current(data);
+ if (node->atom.type == PN_TIMESTAMP) {
+ *t = node->atom.u.as_timestamp;
+ return 0;
+ } else {
+ *t = 0;
+ return PN_ERR;
+ }
+}
+
int pn_data_get_float(pn_data_t *data, float *f)
{
pn_node_t *node = pn_data_current(data);
Modified: qpid/proton/trunk/proton-c/src/message/message.c
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/proton-c/src/message/message.c?rev=1393645&r1=1393644&r2=1393645&view=diff
==============================================================================
--- qpid/proton/trunk/proton-c/src/message/message.c (original)
+++ qpid/proton/trunk/proton-c/src/message/message.c Wed Oct 3 18:16:40 2012
@@ -448,7 +448,7 @@ int pn_message_decode(pn_message_t *msg,
{
pn_bytes_t user_id, address, subject, reply_to, ctype, cencoding,
group_id, reply_to_group_id;
- err = pn_data_scan(msg->data, "D.[.zSSS.ssLLSIS]", &user_id, &address,
+ err = pn_data_scan(msg->data, "D.[.zSSS.ssttSIS]", &user_id, &address,
&subject, &reply_to, &ctype, &cencoding,
&msg->expiry_time, &msg->creation_time, &group_id,
&msg->group_sequence, &reply_to_group_id);
@@ -503,14 +503,14 @@ int pn_message_encode(pn_message_t *msg,
pn_data_clear(msg->data);
- int err = pn_data_fill(msg->data, "DL[oBIoI]", HEADER, msg->durable,
- msg->priority, msg->ttl, msg->first_acquirer,
+ int err = pn_data_fill(msg->data, "DL[oB?IoI]", HEADER, msg->durable,
+ msg->priority, msg->ttl, msg->ttl,
msg->first_acquirer,
msg->delivery_count);
if (err)
return pn_error_format(msg->error, err, "data error: %s",
pn_data_error(msg->data));
- err = pn_data_fill(msg->data, "DL[nzSSSnssLLSiS]", PROPERTIES,
+ err = pn_data_fill(msg->data, "DL[nzSSSnssttSIS]", PROPERTIES,
pn_buffer_bytes(msg->user_id),
pn_buffer_str(msg->address),
pn_buffer_str(msg->subject),
Modified: qpid/proton/trunk/tests/proton_tests/codec.py
URL:
http://svn.apache.org/viewvc/qpid/proton/trunk/tests/proton_tests/codec.py?rev=1393645&r1=1393644&r2=1393645&view=diff
==============================================================================
--- qpid/proton/trunk/tests/proton_tests/codec.py (original)
+++ qpid/proton/trunk/tests/proton_tests/codec.py Wed Oct 3 18:16:40 2012
@@ -171,3 +171,6 @@ class DataTest(Test):
def testSymbol(self):
self._test("symbol", "this is a symbol test", "bleh", "blah")
+
+ def testTimestamp(self):
+ self._test("timestamp", 0, 12345, 1000000)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]