phrocker commented on a change in pull request #482: Minificpp 658 - Port Raw Site to Site to C URL: https://github.com/apache/nifi-minifi-cpp/pull/482#discussion_r260791618
########## File path: nanofi/src/sitetosite/CRawSocketProtocol.c ########## @@ -0,0 +1,1026 @@ +/** + * Site2SiteProtocol class implementation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <stdlib.h> +#include <stdio.h> +#include <string.h> + +#include "uthash.h" +#include "sitetosite/CRawSocketProtocol.h" +#include "sitetosite/CPeer.h" + +#include "core/cstream.h" + +#include "api/nanofi.h" +#include "core/log.h" + +static const char *HandShakePropertyStr[MAX_HANDSHAKE_PROPERTY] = { +/** + * Boolean value indicating whether or not the contents of a FlowFile should + * be GZipped when transferred. + */ +"GZIP", +/** + * The unique identifier of the port to communicate with + */ +"PORT_IDENTIFIER", +/** + * Indicates the number of milliseconds after the request was made that the + * client will wait for a response. If no response has been received by the + * time this value expires, the server can move on without attempting to + * service the request because the client will have already disconnected. + */ +"REQUEST_EXPIRATION_MILLIS", +/** + * The preferred number of FlowFiles that the server should send to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. + */ +"BATCH_COUNT", +/** + * The preferred number of bytes that the server should send to the client + * when pulling data. This property was introduced in version 5 of the + * protocol. + */ +"BATCH_SIZE", +/** + * The preferred amount of time that the server should send data to the + * client when pulling data. This property was introduced in version 5 of + * the protocol. Value is in milliseconds. + */ +"BATCH_DURATION" }; + +typedef struct { + const char * name; + char value[40]; + UT_hash_handle hh; +} PropertyValue; + +int handShake(struct CRawSiteToSiteClient * client) { + if (client->_peer_state != ESTABLISHED) { + //client->logger_->log_error("Site2Site peer state is not established while handshake"); + return -1; + } + //client->logger_->log_debug("Site2Site Protocol Perform hand shake with destination port %s", client->_port_id_str); + + CIDGenerator gen; + gen.implementation_ = CUUID_DEFAULT_IMPL; + generate_uuid(&gen, client->_commsIdentifier); + client->_commsIdentifier[36]='\0'; + + int ret = writeUTF(client->_commsIdentifier, strlen(client->_commsIdentifier), False, client->_peer->_stream); + + if (ret <= 0) { + return -1; + } + + uint32_t prop_size; + PropertyValue *current, *tmp, * properties = NULL; + + current = (PropertyValue *)malloc(sizeof(PropertyValue)); + + current->name = HandShakePropertyStr[GZIP]; + strncpy(current->value, "false", strlen("false") +1); + + HASH_ADD_KEYPTR(hh, properties, current->name, strlen(current->name), current); + + current = (PropertyValue *)malloc(sizeof(PropertyValue)); + + current->name = HandShakePropertyStr[PORT_IDENTIFIER]; + strncpy(current->value, client->_port_id_str, strlen(client->_port_id_str) +1); + + HASH_ADD_KEYPTR(hh, properties, current->name, strlen(current->name), current); + + current = (PropertyValue *)malloc(sizeof(PropertyValue)); + + current->name = HandShakePropertyStr[REQUEST_EXPIRATION_MILLIS]; + sprintf(current->value, "%llu", client->_timeout); + + HASH_ADD_KEYPTR(hh, properties, current->name, strlen(current->name), current); + + prop_size = 3; + + if (client->_currentVersion >= 5) { + if (client->_batch_count > 0) { + current = (PropertyValue *)malloc(sizeof(PropertyValue)); + + current->name = HandShakePropertyStr[BATCH_COUNT]; + sprintf(current->value, "%llu", client->_batch_count); + + HASH_ADD_KEYPTR(hh, properties, current->name, strlen(current->name), current); + + prop_size++; + } + if (client->_batch_size > 0) { + current = (PropertyValue *)malloc(sizeof(PropertyValue)); + + current->name = HandShakePropertyStr[BATCH_SIZE]; + sprintf(current->value, "%llu", client->_batch_size); + + HASH_ADD_KEYPTR(hh, properties, current->name, strlen(current->name), current); + + prop_size++; + } + if (client->_batch_duration > 0) { + current = (PropertyValue *)malloc(sizeof(PropertyValue)); + + current->name = HandShakePropertyStr[BATCH_DURATION]; + sprintf(current->value, "%llu", client->_batch_duration); + + HASH_ADD_KEYPTR(hh, properties, current->name, strlen(current->name), current); + + prop_size++; + } + } + + if (client->_currentVersion >= 3) { + + //ret = client->_peer->writeUTF(client->_peer->getURL()); + const char * urlstr = getURL(client->_peer); + ret = writeUTF(urlstr, strlen(urlstr), False, client->_peer->_stream); + if (ret <= 0) { + return -1; + } + } + + ret = write_uint32_t(prop_size, client->_peer->_stream); + if (ret <= 0) { + return -1; + } + + HASH_ITER(hh, properties, current, tmp) { + if(writeUTF(current->name, strlen(current->name), False, client->_peer->_stream) <= 0) { + return -1; + } + if(writeUTF(current->value, strlen(current->value), False, client->_peer->_stream) <= 0) { + return -1; + } + //client->logger_->log_debug("Site2Site Protocol Send handshake properties %s %s", current->name, current->value); + HASH_DEL(properties, current); + free(current); + } + + RespondCode code; + + ret = readResponse(client, &code); + + if (ret <= 0) { + return -1; + } + + RespondCodeContext *resCode = getRespondCodeContext(code); + + if (resCode->hasDescription) { + uint32_t utflen; + ret = readUTFLen(&utflen, client->_peer->_stream); + if (ret <= 0) + return -1; + + memset(client->_description_buffer, 0, utflen+1); + ret = readUTF(client->_description_buffer, utflen, client->_peer->_stream); + if (ret <= 0) + return -1; + } + + const char * error = ""; + + switch (code) { + case PROPERTIES_OK: + logc(debug, "%s", "Site2Site HandShake Completed"); + client->_peer_state = HANDSHAKED; + return 0; + case PORT_NOT_IN_VALID_STATE: + error = "in invalid state"; + break; + case UNKNOWN_PORT: + error = "an unknown port"; + break; + case PORTS_DESTINATION_FULL: + error = "full"; + break; + // Unknown error + default: + logc(err, "HandShake Failed because of unknown respond code %d", code); + return -1; + } + + // All known error cases handled here + logc(err, "Site2Site HandShake Failed because destination port, %s, is %s", client->_port_id_str, error); + return -2; +} + + +/*bool CRawSiteToSiteClient::getPeerList(std::vector<CPeerStatus> &peers) { + if (establish(this) == 0 && handShake()) { + int status = writeRequestType(this, REQUEST_PEER_LIST); + + if (status <= 0) { + tearDown(this); + return false; + } + + uint32_t number; + status = _peer->read(number); + + if (status <= 0) { + tearDown(this); + return false; + } + + for (uint32_t i = 0; i < number; i++) { + std::string host; + status = _peer->readUTF(host); + if (status <= 0) { + tearDown(this); + return false; + } + uint32_t port; + status = _peer->read(port); + if (status <= 0) { + tearDown(this); + return false; + } + uint8_t secure; + status = _peer->read(secure); + if (status <= 0) { + tearDown(this); + return false; + } + uint32_t count; + status = _peer->read(count); + if (status <= 0) { + tearDown(this); + return false; + } + CPeerStatus status(std::make_shared<CPeer>(port_id_, host, port, secure), count, true); + peers.push_back(std::move(status)); + logging::LOG_TRACE(logger_) << "Site2Site Peer host " << host << " port " << port << " Secure " << secure; + } + + tearDown(this); + return true; + } else { + tearDown(this); + return false; + } +}*/ + +int bootstrap(struct CRawSiteToSiteClient * client) { + if (client->_peer_state == READY) + return 0; + + tearDown(client); + + if (establish(client) ==0 && handShake(client) == 0 && negotiateCodec(client) == 0) { + logc(debug, "%s", "Site to Site ready for data transaction"); + return 0; + } else { + tearDown(client); + return -1; + } +} + +CTransaction* createTransaction(struct CRawSiteToSiteClient * client, TransferDirection direction) { + int ret; + int dataAvailable = 0; + CTransaction* transaction = NULL; + + if (client->_peer_state != READY) { + bootstrap(client); + } + + if (client->_peer_state != READY) { + return transaction; + } + + if (direction == RECEIVE) { + ret = writeRequestType(client, RECEIVE_FLOWFILES); + + if (ret <= 0) { + return transaction; + } + + RespondCode code; + + ret = readResponse(client, &code); + + if (ret <= 0) { + return transaction; + } + + RespondCodeContext *resCode = getRespondCodeContext(code); + + if (resCode->hasDescription) { + uint32_t utflen; + ret = readUTFLen(&utflen, client->_peer->_stream); + if (ret <= 0) + return transaction; + memset(client->_description_buffer, 0, utflen+1); + ret = readUTF(client->_description_buffer, utflen, client->_peer->_stream); + if (ret <= 0) + return transaction; + } + + switch (code) { + case MORE_DATA: + dataAvailable = 1; + logc(trace, "%s", "Site2Site peer indicates that data is available"); + break; + case NO_MORE_DATA: + dataAvailable = 0; + logc(trace, "%s", "Site2Site peer indicates that no data is available"); + break; + default: + logc(warn, "Site2Site got unexpected response %d when asking for data", code); + return NULL; + } + transaction = (CTransaction*)malloc(1* sizeof(CTransaction)); + InitTransaction(transaction, direction, client->_peer->_stream); + addTransaction(client, transaction); + setDataAvailable(transaction, dataAvailable); + logc(trace, "Site2Site create transaction %s", getUUIDStr(transaction)); + return transaction; + } else { + ret = writeRequestType(client, SEND_FLOWFILES); + + if (ret <= 0) { + return NULL; + } else { + transaction = (CTransaction*)malloc(1* sizeof(CTransaction)); + InitTransaction(transaction, direction, client->_peer->_stream); + addTransaction(client, transaction); + logc(trace, "Site2Site create transaction %s", getUUIDStr(transaction)); + return transaction; + } + } +} + +int transmitPayload(struct CRawSiteToSiteClient * client, const char * payload, const attribute_set * attributes) { + CTransaction* transaction = NULL; + + if (payload == NULL && attributes == NULL) { + return -1; + } + + if (client->_peer_state != READY) { + if (bootstrap(client) != 0) { + return -1; + } + } + + if (client->_peer_state != READY) { + tearDown(client); + } + + // Create the transaction + const char * transactionID; + transaction = createTransaction(client, SEND); + + if (transaction == NULL) { + tearDown(client); + return -1; + } + + transactionID = getUUIDStr(transaction); + + CDataPacket packet; + + initPacket(&packet, transaction, attributes, payload); + + int16_t resp = sendPacket(client, transactionID, &packet, NULL); + if (resp != 0) { + deleteTransaction(client, transactionID); + tearDown(client); + return resp; + } + logc(info, "Site2Site transaction %s sent bytes length %lu", transactionID, strlen(payload)); + + + int ret = confirm(client, transactionID); + + if(ret == 0) { + ret = complete(client, transactionID); + } + + deleteTransaction(client, transactionID); + + if (ret != 0) { + tearDown(client); + } + + return ret; +} + +// Complete the transaction +int complete(struct CRawSiteToSiteClient * client, const char * transactionID) { + if (client->_peer_state != READY) { + bootstrap(client); + } + + if (client->_peer_state != READY) { + return -1; + } + + CTransaction* transaction = findTransaction(client, transactionID); + + if (!transaction) { + return -1; + } + + if (transaction->total_transfers_ > 0 && getState(transaction) != TRANSACTION_CONFIRMED) { + return -1; + } + if (getDirection(transaction) == RECEIVE) { + if (transaction->current_transfers_ == 0) { + transaction->_state = TRANSACTION_COMPLETED; + return 0; + } else { + logc(debug, "Site2Site transaction %s send finished", transactionID); + if(writeResponse(client, TRANSACTION_FINISHED, "Finished") <= 0) { + return -1; + } else { + transaction->_state = TRANSACTION_COMPLETED; + return 0; + } + } + } else { + RespondCode code; + + if (readResponse(client, &code) <= 0) { + return -1; + } + + RespondCodeContext *resCode = getRespondCodeContext(code); + + if (resCode->hasDescription) { + uint32_t utflen; + int ret = readUTFLen(&utflen, client->_peer->_stream); + if (ret <= 0) + return -1; + memset(client->_description_buffer, 0, utflen+1); + ret = readUTF(client->_description_buffer, utflen, client->_peer->_stream); + if (ret <= 0) + return -1; + } + + if (code == TRANSACTION_FINISHED) { + logc(debug, "Site2Site transaction %s peer finished transaction", transactionID); + transaction->_state = TRANSACTION_COMPLETED; + return 0; + } else { + logc(warn, "Site2Site transaction %s peer unknown respond code %d", transactionID, code); + return -1; + } + } +} + +int confirm(struct CRawSiteToSiteClient * client, const char * transactionID) { + + if (client->_peer_state != READY) { + bootstrap(client); + } + + if (client->_peer_state != READY) { + return -1; + } + + CTransaction* transaction = findTransaction(client, transactionID); + + if (!transaction) { + return -1; + } + + if (getState(transaction) == TRANSACTION_STARTED && isDataAvailable(transaction) == 0 && getDirection(transaction) == RECEIVE) { + transaction->_state = TRANSACTION_CONFIRMED; + return 0; + } + + if (getState(transaction) != DATA_EXCHANGED) + return -1; + + if (getDirection(transaction) == RECEIVE) { + if (isDataAvailable(transaction) != 0) + return -1; + + // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message + // to peer so that we can verify that the connection is still open. This is a two-phase commit, + // which helps to prevent the chances of data duplication. Without doing this, we may commit the + // session and then when we send the response back to the peer, the peer may have timed out and may not + // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the + // Critical Section involved in this transaction so that rather than the Critical Section being the + // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. + int64_t crcValue = getCRC(transaction); + char crc[40]; + sprintf(crc, "%lld", crcValue); + + logc(debug, "Site2Site Send confirm with CRC %lld to transaction %s", crcValue, transactionID); + if (writeResponse(client, CONFIRM_TRANSACTION, crc) <= 0) { + return -1; + } + + RespondCode code; + if (readResponse(client, &code) <= 0) { + return -1; + } + + RespondCodeContext *resCode = getRespondCodeContext(code); + + if (resCode->hasDescription) { + uint32_t utflen; + int ret = readUTFLen(&utflen, client->_peer->_stream); + if (ret <= 0) + return -1; + memset(client->_description_buffer, 0, utflen+1); + ret = readUTF(client->_description_buffer, utflen, client->_peer->_stream); + if (ret <= 0) + return -1; + } + + if (code == CONFIRM_TRANSACTION) { + logc(debug, "Site2Site transaction %s peer confirm transaction", transactionID); + transaction->_state = TRANSACTION_CONFIRMED; + return 0; + } else if (code == BAD_CHECKSUM) { + logc(debug, "Site2Site transaction %s peer indicate bad checksum", transactionID); + return -1; + } else { + logc(debug, "Site2Site transaction %s peer unknown response code %d", transactionID, code); + return -1; + } + } else { + logc(debug, "Site2Site Send FINISH TRANSACTION for transaction %s", transactionID); + if (writeResponse(client, FINISH_TRANSACTION, "FINISH_TRANSACTION") <= 0) { + return -1; + } + + RespondCode code; + if(readResponse(client, &code) <= 0) { + return -1; + } + + RespondCodeContext *resCode = getRespondCodeContext(code); + + if (resCode->hasDescription) { + uint32_t utflen; + int ret = readUTFLen(&utflen, client->_peer->_stream); + if (ret <= 0) + return -1; + memset(client->_description_buffer, 0, utflen+1); + ret = readUTF(client->_description_buffer, utflen, client->_peer->_stream); + if (ret <= 0) + return -1; + } + + // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response + if (code == CONFIRM_TRANSACTION) { + logc(debug, "Site2Site transaction %s peer confirm transaction with CRC %s", transactionID, client->_description_buffer); + + if (client->_currentVersion > 3) { + int64_t crcValue = getCRC(transaction); + char crc[40]; + memset(crc, 0, 40); + sprintf(crc, "%lld", crcValue); + + if (strcmp(client->_description_buffer, crc) == 0) { + logc(debug, "Site2Site transaction %s CRC matched", transactionID); + if(writeResponse(client, CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION") <= 0) { + return -1; + } + transaction->_state = TRANSACTION_CONFIRMED; + return 0; + } else { + logc(warn, "Site2Site transaction %s CRC not matched %s", transactionID, crc); + writeResponse(client, BAD_CHECKSUM, "BAD_CHECKSUM"); + return -1; + } + } + if (writeResponse(client, CONFIRM_TRANSACTION, "CONFIRM_TRANSACTION") <= 0) { + return -1; + } + transaction->_state = TRANSACTION_CONFIRMED; + return 0; + } else { + logc(debug, "Site2Site transaction %s peer unknown respond code %d", transactionID, code); + return -1; + } + } +} + + int16_t sendPacket(struct CRawSiteToSiteClient * client, const char * transactionID, CDataPacket *packet, flow_file_record * ff) { + + if (client->_peer_state != READY) { + bootstrap(client); + } + + if (client->_peer_state != READY) { + return -1; + } + CTransaction* transaction = findTransaction(client, transactionID); + + if (!transaction) { + return -1; + } + + if (getState(transaction) != TRANSACTION_STARTED && getState(transaction) != DATA_EXCHANGED) { + logc(warn, "Site2Site transaction %s is not at started or exchanged state", transactionID); + return -1; + } + + if (getDirection(transaction) != SEND) { + logc(warn, "Site2Site transaction %s direction is wrong", transactionID); + return -1; + } + + int ret; + + if (transaction->current_transfers_ > 0) { + ret = writeResponse(client, CONTINUE_TRANSACTION, "CONTINUE_TRANSACTION"); + if (ret <= 0) { + return -1; + } + } + // start to read the packet + uint32_t numAttributes = packet->_attributes->size; + ret = write_uint32t(transaction, numAttributes); Review comment: Is write_uint32t a variation of write_uint32_t? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
