Author: millecker
Date: Sat Nov 23 09:49:41 2013
New Revision: 1544764
URL: http://svn.apache.org/r1544764
Log:
HAMA-815: Hama Pipes uses C++ templates (fixed glibc detection)
Modified:
hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc
Modified: hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc
URL:
http://svn.apache.org/viewvc/hama/trunk/c%2B%2B/src/main/native/pipes/impl/Pipes.cc?rev=1544764&r1=1544763&r2=1544764&view=diff
==============================================================================
--- hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc (original)
+++ hama/trunk/c++/src/main/native/pipes/impl/Pipes.cc Sat Nov 23 09:49:41 2013
@@ -173,7 +173,7 @@ public:
cmd = deserializeInt(*in_stream_);
switch (cmd) {
-
+
case START_MESSAGE: {
int32_t protocol_version;
protocol_version = deserialize<int32_t>(*in_stream_);
@@ -184,7 +184,7 @@ public:
handler_->start(protocol_version);
break;
}
- // setup BSP Job Configuration
+ // setup BSP Job Configuration
case SET_BSPJOB_CONF: {
int32_t entries;
entries = deserialize<int32_t>(*in_stream_);
@@ -310,7 +310,7 @@ public:
if (expected_response_cmd == cmd) {
switch (cmd) {
-
+
case GET_MSG_COUNT: {
T msg_count;
msg_count = deserialize<T>(*in_stream_);
@@ -362,7 +362,7 @@ public:
}
return superstep_count;
}
-
+
case SEQFILE_OPEN: {
T file_id = deserialize<T>(*in_stream_);
if(logging) {
@@ -474,7 +474,7 @@ public:
if ((expected_response_cmd == cmd) || (cmd == END_OF_DATA) ) {
switch (cmd) {
-
+
case READ_KEYVALUE: {
K key = deserialize<K>(*in_stream_);
V value = deserialize<V>(*in_stream_);
@@ -563,6 +563,7 @@ public:
reader_ = NULL;
writer_ = NULL;
protocol_ = NULL;
+ uplink_ = NULL;
done_ = false;
has_task_ = false;
@@ -930,24 +931,6 @@ public:
uplink_->sendCommand(READ_KEYVALUE);
- // TODO
- // check if value is array [0, 1, 2, ...], and remove brackets
- /*
- int len = current_value_.length();
- if ( (current_value_[0]=='[') &&
- (current_value_[len-1]==']') ) {
- value = current_value_.substr(1,len-2);
- } else {
- value = current_value_;
- }
-
- if (logging && key.empty() && value.empty()) {
- fprintf(stderr,"HamaPipes::BSPContextImpl::readNext - Empty
KeyValuePair\n");
- }
-
- return (!key.empty() && !value.empty());
- */
-
KeyValuePair<K1,V1> key_value_pair;
key_value_pair = protocol_->template
getKeyValueResult<K1,V1>(READ_KEYVALUE);
@@ -1034,18 +1017,6 @@ public:
// send request
uplink_->sendCommand<int32_t>(SEQFILE_READNEXT, file_id);
- // TODO
- /*
- // check if value is array [0, 1, 2, ...], and remove brackets
- int len = current_value_.length();
- if ( (current_value_[0]=='[') &&
- (current_value_[len-1]==']') ) {
- value = current_value_(1,len-2);
- } else {
- value = current_value_;
- }
- */
-
// get response
KeyValuePair<K,V> key_value_pair;
key_value_pair = protocol_->template
getKeyValueResult<K,V>(SEQFILE_READNEXT);
@@ -1163,14 +1134,15 @@ public:
}
virtual ~BSPContextImpl() {
+ delete factory_;
delete job_;
- //delete inputSplit_;
- //if (reader) {
- // delete value;
- //}
- delete reader_;
delete bsp_;
+ delete partitioner_;
+ delete reader_;
delete writer_;
+ delete protocol_;
+ delete uplink_;
+ //delete inputSplit_;
pthread_mutex_destroy(&mutex_done_);
}
};
@@ -1307,19 +1279,11 @@ bool runTask(const Factory<K1, V1, K2, V
context->waitForTask();
- //while (!context->isDone()) {
- //context->nextKey();
- //}
-
context->closeAll();
protocol->getUplink()->sendCommand(DONE);
//pthread_join(pingThread,NULL);
- // Cleanup
- delete context;
- delete protocol;
-
if (in_stream != NULL) {
fflush(in_stream);
}
@@ -1336,16 +1300,16 @@ bool runTask(const Factory<K1, V1, K2, V
HADOOP_ASSERT(result == 0, "problem closing socket");
}
- //TODO REFACTOR
- if (in_stream != NULL) {
- //fclose(stream);
- }
- if (out_stream != NULL) {
- //fclose(outStream);
- }
+ // Cleanup
+ delete context;
+ delete protocol;
+
delete bufin;
delete bufout;
+ delete in_stream;
+ delete out_stream;
+
return true;
} catch (Error& err) {