http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TBufferTransports.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TBufferTransports.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TBufferTransports.h deleted file mode 100644 index cd6ecea..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TBufferTransports.h +++ /dev/null @@ -1,735 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ -#define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1 - -#include <cstring> -#include <boost/scoped_array.hpp> - -#include <thrift/transport/TTransport.h> -#include <thrift/transport/TVirtualTransport.h> - -#ifdef __GNUC__ -#define TDB_LIKELY(val) (__builtin_expect((val), 1)) -#define TDB_UNLIKELY(val) (__builtin_expect((val), 0)) -#else -#define TDB_LIKELY(val) (val) -#define TDB_UNLIKELY(val) (val) -#endif - -namespace apache { namespace thrift { namespace transport { - - -/** - * Base class for all transports that use read/write buffers for performance. - * - * TBufferBase is designed to implement the fast-path "memcpy" style - * operations that work in the common case. It does so with small and - * (eventually) nonvirtual, inlinable methods. TBufferBase is an abstract - * class. Subclasses are expected to define the "slow path" operations - * that have to be done when the buffers are full or empty. - * - */ -class TBufferBase : public TVirtualTransport<TBufferBase> { - - public: - - /** - * Fast-path read. - * - * When we have enough data buffered to fulfill the read, we can satisfy it - * with a single memcpy, then adjust our internal pointers. If the buffer - * is empty, we call out to our slow path, implemented by a subclass. - * This method is meant to eventually be nonvirtual and inlinable. - */ - uint32_t read(uint8_t* buf, uint32_t len) { - uint8_t* new_rBase = rBase_ + len; - if (TDB_LIKELY(new_rBase <= rBound_)) { - std::memcpy(buf, rBase_, len); - rBase_ = new_rBase; - return len; - } - return readSlow(buf, len); - } - - /** - * Shortcutted version of readAll. - */ - uint32_t readAll(uint8_t* buf, uint32_t len) { - uint8_t* new_rBase = rBase_ + len; - if (TDB_LIKELY(new_rBase <= rBound_)) { - std::memcpy(buf, rBase_, len); - rBase_ = new_rBase; - return len; - } - return apache::thrift::transport::readAll(*this, buf, len); - } - - /** - * Fast-path write. - * - * When we have enough empty space in our buffer to accomodate the write, we - * can satisfy it with a single memcpy, then adjust our internal pointers. - * If the buffer is full, we call out to our slow path, implemented by a - * subclass. This method is meant to eventually be nonvirtual and - * inlinable. - */ - void write(const uint8_t* buf, uint32_t len) { - uint8_t* new_wBase = wBase_ + len; - if (TDB_LIKELY(new_wBase <= wBound_)) { - std::memcpy(wBase_, buf, len); - wBase_ = new_wBase; - return; - } - writeSlow(buf, len); - } - - /** - * Fast-path borrow. A lot like the fast-path read. - */ - const uint8_t* borrow(uint8_t* buf, uint32_t* len) { - if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) { - // With strict aliasing, writing to len shouldn't force us to - // refetch rBase_ from memory. TODO(dreiss): Verify this. - *len = static_cast<uint32_t>(rBound_ - rBase_); - return rBase_; - } - return borrowSlow(buf, len); - } - - /** - * Consume doesn't require a slow path. - */ - void consume(uint32_t len) { - if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) { - rBase_ += len; - } else { - throw TTransportException(TTransportException::BAD_ARGS, - "consume did not follow a borrow."); - } - } - - - protected: - - /// Slow path read. - virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0; - - /// Slow path write. - virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0; - - /** - * Slow path borrow. - * - * POSTCONDITION: return == NULL || rBound_ - rBase_ >= *len - */ - virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0; - - /** - * Trivial constructor. - * - * Initialize pointers safely. Constructing is not a very - * performance-sensitive operation, so it is okay to just leave it to - * the concrete class to set up pointers correctly. - */ - TBufferBase() - : rBase_(NULL) - , rBound_(NULL) - , wBase_(NULL) - , wBound_(NULL) - {} - - /// Convenience mutator for setting the read buffer. - void setReadBuffer(uint8_t* buf, uint32_t len) { - rBase_ = buf; - rBound_ = buf+len; - } - - /// Convenience mutator for setting the write buffer. - void setWriteBuffer(uint8_t* buf, uint32_t len) { - wBase_ = buf; - wBound_ = buf+len; - } - - virtual ~TBufferBase() {} - - /// Reads begin here. - uint8_t* rBase_; - /// Reads may extend to just before here. - uint8_t* rBound_; - - /// Writes begin here. - uint8_t* wBase_; - /// Writes may extend to just before here. - uint8_t* wBound_; -}; - - -/** - * Buffered transport. For reads it will read more data than is requested - * and will serve future data out of a local buffer. For writes, data is - * stored to an in memory buffer before being written out. - * - */ -class TBufferedTransport - : public TVirtualTransport<TBufferedTransport, TBufferBase> { - public: - - static const int DEFAULT_BUFFER_SIZE = 512; - - /// Use default buffer sizes. - TBufferedTransport(boost::shared_ptr<TTransport> transport) - : transport_(transport) - , rBufSize_(DEFAULT_BUFFER_SIZE) - , wBufSize_(DEFAULT_BUFFER_SIZE) - , rBuf_(new uint8_t[rBufSize_]) - , wBuf_(new uint8_t[wBufSize_]) - { - initPointers(); - } - - /// Use specified buffer sizes. - TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz) - : transport_(transport) - , rBufSize_(sz) - , wBufSize_(sz) - , rBuf_(new uint8_t[rBufSize_]) - , wBuf_(new uint8_t[wBufSize_]) - { - initPointers(); - } - - /// Use specified read and write buffer sizes. - TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz) - : transport_(transport) - , rBufSize_(rsz) - , wBufSize_(wsz) - , rBuf_(new uint8_t[rBufSize_]) - , wBuf_(new uint8_t[wBufSize_]) - { - initPointers(); - } - - void open() { - transport_->open(); - } - - bool isOpen() { - return transport_->isOpen(); - } - - bool peek() { - if (rBase_ == rBound_) { - setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_)); - } - return (rBound_ > rBase_); - } - - void close() { - flush(); - transport_->close(); - } - - virtual uint32_t readSlow(uint8_t* buf, uint32_t len); - - virtual void writeSlow(const uint8_t* buf, uint32_t len); - - void flush(); - - - /** - * The following behavior is currently implemented by TBufferedTransport, - * but that may change in a future version: - * 1/ If len is at most rBufSize_, borrow will never return NULL. - * Depending on the underlying transport, it could throw an exception - * or hang forever. - * 2/ Some borrow requests may copy bytes internally. However, - * if len is at most rBufSize_/2, none of the copied bytes - * will ever have to be copied again. For optimial performance, - * stay under this limit. - */ - virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); - - boost::shared_ptr<TTransport> getUnderlyingTransport() { - return transport_; - } - - /* - * TVirtualTransport provides a default implementation of readAll(). - * We want to use the TBufferBase version instead. - */ - uint32_t readAll(uint8_t* buf, uint32_t len) { - return TBufferBase::readAll(buf, len); - } - - protected: - void initPointers() { - setReadBuffer(rBuf_.get(), 0); - setWriteBuffer(wBuf_.get(), wBufSize_); - // Write size never changes. - } - - boost::shared_ptr<TTransport> transport_; - - uint32_t rBufSize_; - uint32_t wBufSize_; - boost::scoped_array<uint8_t> rBuf_; - boost::scoped_array<uint8_t> wBuf_; -}; - - -/** - * Wraps a transport into a buffered one. - * - */ -class TBufferedTransportFactory : public TTransportFactory { - public: - TBufferedTransportFactory() {} - - virtual ~TBufferedTransportFactory() {} - - /** - * Wraps the transport into a buffered one. - */ - virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) { - return boost::shared_ptr<TTransport>(new TBufferedTransport(trans)); - } - -}; - - -/** - * Framed transport. All writes go into an in-memory buffer until flush is - * called, at which point the transport writes the length of the entire - * binary chunk followed by the data payload. This allows the receiver on the - * other end to always do fixed-length reads. - * - */ -class TFramedTransport - : public TVirtualTransport<TFramedTransport, TBufferBase> { - public: - - static const int DEFAULT_BUFFER_SIZE = 512; - - /// Use default buffer sizes. - TFramedTransport(boost::shared_ptr<TTransport> transport) - : transport_(transport) - , rBufSize_(0) - , wBufSize_(DEFAULT_BUFFER_SIZE) - , rBuf_() - , wBuf_(new uint8_t[wBufSize_]) - { - initPointers(); - } - - TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz) - : transport_(transport) - , rBufSize_(0) - , wBufSize_(sz) - , rBuf_() - , wBuf_(new uint8_t[wBufSize_]) - { - initPointers(); - } - - void open() { - transport_->open(); - } - - bool isOpen() { - return transport_->isOpen(); - } - - bool peek() { - return (rBase_ < rBound_) || transport_->peek(); - } - - void close() { - flush(); - transport_->close(); - } - - virtual uint32_t readSlow(uint8_t* buf, uint32_t len); - - virtual void writeSlow(const uint8_t* buf, uint32_t len); - - virtual void flush(); - - uint32_t readEnd(); - - uint32_t writeEnd(); - - const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); - - boost::shared_ptr<TTransport> getUnderlyingTransport() { - return transport_; - } - - /* - * TVirtualTransport provides a default implementation of readAll(). - * We want to use the TBufferBase version instead. - */ - uint32_t readAll(uint8_t* buf, uint32_t len) { - return TBufferBase::readAll(buf,len); - } - - protected: - /** - * Reads a frame of input from the underlying stream. - * - * Returns true if a frame was read successfully, or false on EOF. - * (Raises a TTransportException if EOF occurs after a partial frame.) - */ - bool readFrame(); - - void initPointers() { - setReadBuffer(NULL, 0); - setWriteBuffer(wBuf_.get(), wBufSize_); - - // Pad the buffer so we can insert the size later. - int32_t pad = 0; - this->write((uint8_t*)&pad, sizeof(pad)); - } - - boost::shared_ptr<TTransport> transport_; - - uint32_t rBufSize_; - uint32_t wBufSize_; - boost::scoped_array<uint8_t> rBuf_; - boost::scoped_array<uint8_t> wBuf_; -}; - -/** - * Wraps a transport into a framed one. - * - */ -class TFramedTransportFactory : public TTransportFactory { - public: - TFramedTransportFactory() {} - - virtual ~TFramedTransportFactory() {} - - /** - * Wraps the transport into a framed one. - */ - virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) { - return boost::shared_ptr<TTransport>(new TFramedTransport(trans)); - } - -}; - - -/** - * A memory buffer is a tranpsort that simply reads from and writes to an - * in memory buffer. Anytime you call write on it, the data is simply placed - * into a buffer, and anytime you call read, data is read from that buffer. - * - * The buffers are allocated using C constructs malloc,realloc, and the size - * doubles as necessary. We've considered using scoped - * - */ -class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> { - private: - - // Common initialization done by all constructors. - void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) { - if (buf == NULL && size != 0) { - assert(owner); - buf = (uint8_t*)std::malloc(size); - if (buf == NULL) { - throw std::bad_alloc(); - } - } - - buffer_ = buf; - bufferSize_ = size; - - rBase_ = buffer_; - rBound_ = buffer_ + wPos; - // TODO(dreiss): Investigate NULL-ing this if !owner. - wBase_ = buffer_ + wPos; - wBound_ = buffer_ + bufferSize_; - - owner_ = owner; - - // rBound_ is really an artifact. In principle, it should always be - // equal to wBase_. We update it in a few places (computeRead, etc.). - } - - public: - static const uint32_t defaultSize = 1024; - - /** - * This enum specifies how a TMemoryBuffer should treat - * memory passed to it via constructors or resetBuffer. - * - * OBSERVE: - * TMemoryBuffer will simply store a pointer to the memory. - * It is the callers responsibility to ensure that the pointer - * remains valid for the lifetime of the TMemoryBuffer, - * and that it is properly cleaned up. - * Note that no data can be written to observed buffers. - * - * COPY: - * TMemoryBuffer will make an internal copy of the buffer. - * The caller has no responsibilities. - * - * TAKE_OWNERSHIP: - * TMemoryBuffer will become the "owner" of the buffer, - * and will be responsible for freeing it. - * The membory must have been allocated with malloc. - */ - enum MemoryPolicy - { OBSERVE = 1 - , COPY = 2 - , TAKE_OWNERSHIP = 3 - }; - - /** - * Construct a TMemoryBuffer with a default-sized buffer, - * owned by the TMemoryBuffer object. - */ - TMemoryBuffer() { - initCommon(NULL, defaultSize, true, 0); - } - - /** - * Construct a TMemoryBuffer with a buffer of a specified size, - * owned by the TMemoryBuffer object. - * - * @param sz The initial size of the buffer. - */ - TMemoryBuffer(uint32_t sz) { - initCommon(NULL, sz, true, 0); - } - - /** - * Construct a TMemoryBuffer with buf as its initial contents. - * - * @param buf The initial contents of the buffer. - * Note that, while buf is a non-const pointer, - * TMemoryBuffer will not write to it if policy == OBSERVE, - * so it is safe to const_cast<uint8_t*>(whatever). - * @param sz The size of @c buf. - * @param policy See @link MemoryPolicy @endlink . - */ - TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { - if (buf == NULL && sz != 0) { - throw TTransportException(TTransportException::BAD_ARGS, - "TMemoryBuffer given null buffer with non-zero size."); - } - - switch (policy) { - case OBSERVE: - case TAKE_OWNERSHIP: - initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz); - break; - case COPY: - initCommon(NULL, sz, true, 0); - this->write(buf, sz); - break; - default: - throw TTransportException(TTransportException::BAD_ARGS, - "Invalid MemoryPolicy for TMemoryBuffer"); - } - } - - ~TMemoryBuffer() { - if (owner_) { - std::free(buffer_); - } - } - - bool isOpen() { - return true; - } - - bool peek() { - return (rBase_ < wBase_); - } - - void open() {} - - void close() {} - - // TODO(dreiss): Make bufPtr const. - void getBuffer(uint8_t** bufPtr, uint32_t* sz) { - *bufPtr = rBase_; - *sz = static_cast<uint32_t>(wBase_ - rBase_); - } - - std::string getBufferAsString() { - if (buffer_ == NULL) { - return ""; - } - uint8_t* buf; - uint32_t sz; - getBuffer(&buf, &sz); - return std::string((char*)buf, (std::string::size_type)sz); - } - - void appendBufferToString(std::string& str) { - if (buffer_ == NULL) { - return; - } - uint8_t* buf; - uint32_t sz; - getBuffer(&buf, &sz); - str.append((char*)buf, sz); - } - - void resetBuffer() { - rBase_ = buffer_; - rBound_ = buffer_; - wBase_ = buffer_; - // It isn't safe to write into a buffer we don't own. - if (!owner_) { - wBound_ = wBase_; - bufferSize_ = 0; - } - } - - /// See constructor documentation. - void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { - // Use a variant of the copy-and-swap trick for assignment operators. - // This is sub-optimal in terms of performance for two reasons: - // 1/ The constructing and swapping of the (small) values - // in the temporary object takes some time, and is not necessary. - // 2/ If policy == COPY, we allocate the new buffer before - // freeing the old one, precluding the possibility of - // reusing that memory. - // I doubt that either of these problems could be optimized away, - // but the second is probably no a common case, and the first is minor. - // I don't expect resetBuffer to be a common operation, so I'm willing to - // bite the performance bullet to make the method this simple. - - // Construct the new buffer. - TMemoryBuffer new_buffer(buf, sz, policy); - // Move it into ourself. - this->swap(new_buffer); - // Our old self gets destroyed. - } - - /// See constructor documentation. - void resetBuffer(uint32_t sz) { - // Construct the new buffer. - TMemoryBuffer new_buffer(sz); - // Move it into ourself. - this->swap(new_buffer); - // Our old self gets destroyed. - } - - std::string readAsString(uint32_t len) { - std::string str; - (void)readAppendToString(str, len); - return str; - } - - uint32_t readAppendToString(std::string& str, uint32_t len); - - // return number of bytes read - uint32_t readEnd() { - //This cast should be safe, because buffer_'s size is a uint32_t - uint32_t bytes = static_cast<uint32_t>(rBase_ - buffer_); - if (rBase_ == wBase_) { - resetBuffer(); - } - return bytes; - } - - // Return number of bytes written - uint32_t writeEnd() { - //This cast should be safe, because buffer_'s size is a uint32_t - return static_cast<uint32_t>(wBase_ - buffer_); - } - - uint32_t available_read() const { - // Remember, wBase_ is the real rBound_. - return static_cast<uint32_t>(wBase_ - rBase_); - } - - uint32_t available_write() const { - return static_cast<uint32_t>(wBound_ - wBase_); - } - - // Returns a pointer to where the client can write data to append to - // the TMemoryBuffer, and ensures the buffer is big enough to accomodate a - // write of the provided length. The returned pointer is very convenient for - // passing to read(), recv(), or similar. You must call wroteBytes() as soon - // as data is written or the buffer will not be aware that data has changed. - uint8_t* getWritePtr(uint32_t len) { - ensureCanWrite(len); - return wBase_; - } - - // Informs the buffer that the client has written 'len' bytes into storage - // that had been provided by getWritePtr(). - void wroteBytes(uint32_t len); - - /* - * TVirtualTransport provides a default implementation of readAll(). - * We want to use the TBufferBase version instead. - */ - uint32_t readAll(uint8_t* buf, uint32_t len) { - return TBufferBase::readAll(buf,len); - } - - protected: - void swap(TMemoryBuffer& that) { - using std::swap; - swap(buffer_, that.buffer_); - swap(bufferSize_, that.bufferSize_); - - swap(rBase_, that.rBase_); - swap(rBound_, that.rBound_); - swap(wBase_, that.wBase_); - swap(wBound_, that.wBound_); - - swap(owner_, that.owner_); - } - - // Make sure there's at least 'len' bytes available for writing. - void ensureCanWrite(uint32_t len); - - // Compute the position and available data for reading. - void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give); - - uint32_t readSlow(uint8_t* buf, uint32_t len); - - void writeSlow(const uint8_t* buf, uint32_t len); - - const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len); - - // Data buffer - uint8_t* buffer_; - - // Allocated buffer size - uint32_t bufferSize_; - - // Is this object the owner of the buffer? - bool owner_; - - // Don't forget to update constrctors, initCommon, and swap if - // you add new members. -}; - -}}} // apache::thrift::transport - -#endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_
http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.cpp deleted file mode 100644 index 3b72de5..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.cpp +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <cerrno> -#include <exception> - -#include <thrift/transport/TFDTransport.h> -#include <thrift/transport/PlatformSocket.h> - -#ifdef HAVE_UNISTD_H -#include <unistd.h> -#endif - -#ifdef _WIN32 -#include <io.h> -#endif - -using namespace std; - -namespace apache { namespace thrift { namespace transport { - -void TFDTransport::close() { - if (!isOpen()) { - return; - } - - int rv = ::THRIFT_CLOSESOCKET(fd_); - int errno_copy = THRIFT_GET_SOCKET_ERROR; - fd_ = -1; - // Have to check uncaught_exception because this is called in the destructor. - if (rv < 0 && !std::uncaught_exception()) { - throw TTransportException(TTransportException::UNKNOWN, - "TFDTransport::close()", - errno_copy); - } -} - -uint32_t TFDTransport::read(uint8_t* buf, uint32_t len) { - unsigned int maxRetries = 5; // same as the TSocket default - unsigned int retries = 0; - while (true) { - THRIFT_SSIZET rv = ::read(fd_, buf, len); - if (rv < 0) { - if (THRIFT_GET_SOCKET_ERROR == THRIFT_EINTR && retries < maxRetries) { - // If interrupted, try again - ++retries; - continue; - } - int errno_copy = THRIFT_GET_SOCKET_ERROR; - throw TTransportException(TTransportException::UNKNOWN, - "TFDTransport::read()", - errno_copy); - } - //this should be fine, since we already checked for negative values, - //and ::read should only return a 32-bit value since len is 32-bit. - return static_cast<uint32_t>(rv); - } -} - -void TFDTransport::write(const uint8_t* buf, uint32_t len) { - while (len > 0) { - THRIFT_SSIZET rv = ::write(fd_, buf, len); - - if (rv < 0) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - throw TTransportException(TTransportException::UNKNOWN, - "TFDTransport::write()", - errno_copy); - } else if (rv == 0) { - throw TTransportException(TTransportException::END_OF_FILE, - "TFDTransport::write()"); - } - - buf += rv; - //this should be fine, as we've already checked for negative values, and - //::write shouldn't return more than a uint32_t since len is a uint32_t - len -= static_cast<uint32_t>(rv); - } -} - -}}} // apache::thrift::transport http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.h deleted file mode 100644 index cc4f9c1..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFDTransport.h +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_TRANSPORT_TFDTRANSPORT_H_ -#define _THRIFT_TRANSPORT_TFDTRANSPORT_H_ 1 - -#include <string> -#ifdef HAVE_SYS_TIME_H -#include <sys/time.h> -#endif - -#include <thrift/transport/TTransport.h> -#include <thrift/transport/TVirtualTransport.h> - -namespace apache { namespace thrift { namespace transport { - -/** - * Dead-simple wrapper around a file descriptor. - * - */ -class TFDTransport : public TVirtualTransport<TFDTransport> { - public: - enum ClosePolicy - { NO_CLOSE_ON_DESTROY = 0 - , CLOSE_ON_DESTROY = 1 - }; - - TFDTransport(int fd, ClosePolicy close_policy = NO_CLOSE_ON_DESTROY) - : fd_(fd) - , close_policy_(close_policy) - {} - - ~TFDTransport() { - if (close_policy_ == CLOSE_ON_DESTROY) { - close(); - } - } - - bool isOpen() { return fd_ >= 0; } - - void open() {} - - void close(); - - uint32_t read(uint8_t* buf, uint32_t len); - - void write(const uint8_t* buf, uint32_t len); - - void setFD(int fd) { fd_ = fd; } - int getFD() { return fd_; } - - protected: - int fd_; - ClosePolicy close_policy_; -}; - -}}} // apache::thrift::transport - -#endif // #ifndef _THRIFT_TRANSPORT_TFDTRANSPORT_H_ http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.cpp deleted file mode 100644 index c94ecd2..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.cpp +++ /dev/null @@ -1,1069 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <thrift/thrift-config.h> - -#include <thrift/transport/TFileTransport.h> -#include <thrift/transport/TTransportUtils.h> -#include <thrift/transport/PlatformSocket.h> -#include <thrift/concurrency/FunctionRunner.h> - -#ifdef HAVE_SYS_TIME_H -#include <sys/time.h> -#else -#include <time.h> -#endif -#include <fcntl.h> -#ifdef HAVE_UNISTD_H -#include <unistd.h> -#endif -#ifdef HAVE_STRINGS_H -#include <strings.h> -#endif -#include <cstdlib> -#include <cstring> -#include <iostream> -#include <limits> -#ifdef HAVE_SYS_STAT_H -#include <sys/stat.h> -#endif - -#ifdef _WIN32 -#include <io.h> -#endif - -namespace apache { namespace thrift { namespace transport { - -using boost::scoped_ptr; -using boost::shared_ptr; -using namespace std; -using namespace apache::thrift::protocol; -using namespace apache::thrift::concurrency; - -TFileTransport::TFileTransport(string path, bool readOnly) - : readState_() - , readBuff_(NULL) - , currentEvent_(NULL) - , readBuffSize_(DEFAULT_READ_BUFF_SIZE) - , readTimeout_(NO_TAIL_READ_TIMEOUT) - , chunkSize_(DEFAULT_CHUNK_SIZE) - , eventBufferSize_(DEFAULT_EVENT_BUFFER_SIZE) - , flushMaxUs_(DEFAULT_FLUSH_MAX_US) - , flushMaxBytes_(DEFAULT_FLUSH_MAX_BYTES) - , maxEventSize_(DEFAULT_MAX_EVENT_SIZE) - , maxCorruptedEvents_(DEFAULT_MAX_CORRUPTED_EVENTS) - , eofSleepTime_(DEFAULT_EOF_SLEEP_TIME_US) - , corruptedEventSleepTime_(DEFAULT_CORRUPTED_SLEEP_TIME_US) - , writerThreadIOErrorSleepTime_(DEFAULT_WRITER_THREAD_SLEEP_TIME_US) - , dequeueBuffer_(NULL) - , enqueueBuffer_(NULL) - , notFull_(&mutex_) - , notEmpty_(&mutex_) - , closing_(false) - , flushed_(&mutex_) - , forceFlush_(false) - , filename_(path) - , fd_(0) - , bufferAndThreadInitialized_(false) - , offset_(0) - , lastBadChunk_(0) - , numCorruptedEventsInChunk_(0) - , readOnly_(readOnly) -{ - threadFactory_.setDetached(false); - openLogFile(); -} - -void TFileTransport::resetOutputFile(int fd, string filename, off_t offset) { - filename_ = filename; - offset_ = offset; - - // check if current file is still open - if (fd_ > 0) { - // flush any events in the queue - flush(); - GlobalOutput.printf("error, current file (%s) not closed", filename_.c_str()); - if (-1 == ::THRIFT_CLOSESOCKET(fd_)) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TFileTransport: resetOutputFile() ::close() ", errno_copy); - throw TTransportException(TTransportException::UNKNOWN, "TFileTransport: error in file close", errno_copy); - } else { - //successfully closed fd - fd_ = 0; - } - } - - if (fd) { - fd_ = fd; - } else { - // open file if the input fd is 0 - openLogFile(); - } -} - - -TFileTransport::~TFileTransport() { - // flush the buffer if a writer thread is active - if(writerThread_.get()) { - // set state to closing - closing_ = true; - - // wake up the writer thread - // Since closing_ is true, it will attempt to flush all data, then exit. - notEmpty_.notify(); - - writerThread_->join(); - writerThread_.reset(); - } - - if (dequeueBuffer_) { - delete dequeueBuffer_; - dequeueBuffer_ = NULL; - } - - if (enqueueBuffer_) { - delete enqueueBuffer_; - enqueueBuffer_ = NULL; - } - - if (readBuff_) { - delete[] readBuff_; - readBuff_ = NULL; - } - - if (currentEvent_) { - delete currentEvent_; - currentEvent_ = NULL; - } - - // close logfile - if (fd_ > 0) { - if(-1 == ::THRIFT_CLOSESOCKET(fd_)) { - GlobalOutput.perror("TFileTransport: ~TFileTransport() ::close() ", THRIFT_GET_SOCKET_ERROR); - } else { - //successfully closed fd - fd_ = 0; - } - } -} - -bool TFileTransport::initBufferAndWriteThread() { - if (bufferAndThreadInitialized_) { - T_ERROR("%s", "Trying to double-init TFileTransport"); - return false; - } - - if(!writerThread_.get()) { - writerThread_ = threadFactory_.newThread( - apache::thrift::concurrency::FunctionRunner::create(startWriterThread, this)); - writerThread_->start(); - } - - dequeueBuffer_ = new TFileTransportBuffer(eventBufferSize_); - enqueueBuffer_ = new TFileTransportBuffer(eventBufferSize_); - bufferAndThreadInitialized_ = true; - - return true; -} - -void TFileTransport::write(const uint8_t* buf, uint32_t len) { - if (readOnly_) { - throw TTransportException("TFileTransport: attempting to write to file opened readonly"); - } - - enqueueEvent(buf, len); -} - -void TFileTransport::enqueueEvent(const uint8_t* buf, uint32_t eventLen) { - // can't enqueue more events if file is going to close - if (closing_) { - return; - } - - // make sure that event size is valid - if ( (maxEventSize_ > 0) && (eventLen > maxEventSize_) ) { - T_ERROR("msg size is greater than max event size: %u > %u\n", eventLen, maxEventSize_); - return; - } - - if (eventLen == 0) { - T_ERROR("%s", "cannot enqueue an empty event"); - return; - } - - eventInfo* toEnqueue = new eventInfo(); - toEnqueue->eventBuff_ = (uint8_t *)std::malloc((sizeof(uint8_t) * eventLen) + 4); - if (toEnqueue->eventBuff_ == NULL) { - delete toEnqueue; - throw std::bad_alloc(); - } - // first 4 bytes is the event length - memcpy(toEnqueue->eventBuff_, (void*)(&eventLen), 4); - // actual event contents - memcpy(toEnqueue->eventBuff_ + 4, buf, eventLen); - toEnqueue->eventSize_ = eventLen + 4; - - // lock mutex - Guard g(mutex_); - - // make sure that enqueue buffer is initialized and writer thread is running - if (!bufferAndThreadInitialized_) { - if (!initBufferAndWriteThread()) { - delete toEnqueue; - return; - } - } - - // Can't enqueue while buffer is full - while (enqueueBuffer_->isFull()) { - notFull_.wait(); - } - - // We shouldn't be trying to enqueue new data while a forced flush is - // requested. (Otherwise the writer thread might not ever be able to finish - // the flush if more data keeps being enqueued.) - assert(!forceFlush_); - - // add to the buffer - if (!enqueueBuffer_->addEvent(toEnqueue)) { - delete toEnqueue; - return; - } - - // signal anybody who's waiting for the buffer to be non-empty - notEmpty_.notify(); - - // this really should be a loop where it makes sure it got flushed - // because condition variables can get triggered by the os for no reason - // it is probably a non-factor for the time being -} - -bool TFileTransport::swapEventBuffers(struct timeval* deadline) { - bool swap; - Guard g(mutex_); - - if (!enqueueBuffer_->isEmpty()) { - swap = true; - } else if (closing_) { - // even though there is no data to write, - // return immediately if the transport is closing - swap = false; - } else { - if (deadline != NULL) { - // if we were handed a deadline time struct, do a timed wait - notEmpty_.waitForTime(deadline); - } else { - // just wait until the buffer gets an item - notEmpty_.wait(); - } - - // could be empty if we timed out - swap = enqueueBuffer_->isEmpty(); - } - - if (swap) { - TFileTransportBuffer *temp = enqueueBuffer_; - enqueueBuffer_ = dequeueBuffer_; - dequeueBuffer_ = temp; - } - - - if (swap) { - notFull_.notify(); - } - - return swap; -} - - -void TFileTransport::writerThread() { - bool hasIOError = false; - - // open file if it is not open - if(!fd_) { - try { - openLogFile(); - } catch (...) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TFileTransport: writerThread() openLogFile() ", errno_copy); - fd_ = 0; - hasIOError = true; - } - } - - // set the offset to the correct value (EOF) - if (!hasIOError) { - try { - seekToEnd(); - // throw away any partial events - offset_ += readState_.lastDispatchPtr_; -#ifndef _WIN32 - ftruncate(fd_, offset_); -#else - _chsize_s(fd_, offset_); -#endif - readState_.resetAllValues(); - } catch (...) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TFileTransport: writerThread() initialization ", errno_copy); - hasIOError = true; - } - } - - // Figure out the next time by which a flush must take place - struct timeval ts_next_flush; - getNextFlushTime(&ts_next_flush); - uint32_t unflushed = 0; - - while (1) { - // this will only be true when the destructor is being invoked - if (closing_) { - if (hasIOError) { - return; - } - - // Try to empty buffers before exit - if (enqueueBuffer_->isEmpty() && dequeueBuffer_->isEmpty()) { -#ifndef _WIN32 - fsync(fd_); -#endif - if (-1 == ::THRIFT_CLOSESOCKET(fd_)) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TFileTransport: writerThread() ::close() ", errno_copy); - } else { - //fd successfully closed - fd_ = 0; - } - return; - } - } - - if (swapEventBuffers(&ts_next_flush)) { - eventInfo* outEvent; - while (NULL != (outEvent = dequeueBuffer_->getNext())) { - // Remove an event from the buffer and write it out to disk. If there is any IO error, for instance, - // the output file is unmounted or deleted, then this event is dropped. However, the writer thread - // will: (1) sleep for a short while; (2) try to reopen the file; (3) if successful then start writing - // from the end. - - while (hasIOError) { - T_ERROR("TFileTransport: writer thread going to sleep for %d microseconds due to IO errors", writerThreadIOErrorSleepTime_); - THRIFT_SLEEP_USEC(writerThreadIOErrorSleepTime_); - if (closing_) { - return; - } - if (!fd_) { - ::THRIFT_CLOSESOCKET(fd_); - fd_ = 0; - } - try { - openLogFile(); - seekToEnd(); - unflushed = 0; - hasIOError = false; - T_LOG_OPER("TFileTransport: log file %s reopened by writer thread during error recovery", filename_.c_str()); - } catch (...) { - T_ERROR("TFileTransport: unable to reopen log file %s during error recovery", filename_.c_str()); - } - } - - // sanity check on event - if ((maxEventSize_ > 0) && (outEvent->eventSize_ > maxEventSize_)) { - T_ERROR("msg size is greater than max event size: %u > %u\n", outEvent->eventSize_, maxEventSize_); - continue; - } - - // If chunking is required, then make sure that msg does not cross chunk boundary - if ((outEvent->eventSize_ > 0) && (chunkSize_ != 0)) { - // event size must be less than chunk size - if (outEvent->eventSize_ > chunkSize_) { - T_ERROR("TFileTransport: event size(%u) > chunk size(%u): skipping event", outEvent->eventSize_, chunkSize_); - continue; - } - - int64_t chunk1 = offset_/chunkSize_; - int64_t chunk2 = (offset_ + outEvent->eventSize_ - 1)/chunkSize_; - - // if adding this event will cross a chunk boundary, pad the chunk with zeros - if (chunk1 != chunk2) { - // refetch the offset to keep in sync - offset_ = lseek(fd_, 0, SEEK_CUR); - int32_t padding = (int32_t)((offset_ / chunkSize_ + 1) * chunkSize_ - offset_); - - uint8_t* zeros = new uint8_t[padding]; - memset(zeros, '\0', padding); - boost::scoped_array<uint8_t> array(zeros); - if (-1 == ::write(fd_, zeros, padding)) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TFileTransport: writerThread() error while padding zeros ", errno_copy); - hasIOError = true; - continue; - } - unflushed += padding; - offset_ += padding; - } - } - - // write the dequeued event to the file - if (outEvent->eventSize_ > 0) { - if (-1 == ::write(fd_, outEvent->eventBuff_, outEvent->eventSize_)) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TFileTransport: error while writing event ", errno_copy); - hasIOError = true; - continue; - } - unflushed += outEvent->eventSize_; - offset_ += outEvent->eventSize_; - } - } - dequeueBuffer_->reset(); - } - - if (hasIOError) { - continue; - } - - // Local variable to cache the state of forceFlush_. - // - // We only want to check the value of forceFlush_ once each time around the - // loop. If we check it more than once without holding the lock the entire - // time, it could have changed state in between. This will result in us - // making inconsistent decisions. - bool forced_flush = false; - { - Guard g(mutex_); - if (forceFlush_) { - if (!enqueueBuffer_->isEmpty()) { - // If forceFlush_ is true, we need to flush all available data. - // If enqueueBuffer_ is not empty, go back to the start of the loop to - // write it out. - // - // We know the main thread is waiting on forceFlush_ to be cleared, - // so no new events will be added to enqueueBuffer_ until we clear - // forceFlush_. Therefore the next time around the loop enqueueBuffer_ - // is guaranteed to be empty. (I.e., we're guaranteed to make progress - // and clear forceFlush_ the next time around the loop.) - continue; - } - forced_flush = true; - } - } - - // determine if we need to perform an fsync - bool flush = false; - if (forced_flush || unflushed > flushMaxBytes_) { - flush = true; - } else { - struct timeval current_time; - THRIFT_GETTIMEOFDAY(¤t_time, NULL); - if (current_time.tv_sec > ts_next_flush.tv_sec || - (current_time.tv_sec == ts_next_flush.tv_sec && - current_time.tv_usec > ts_next_flush.tv_usec)) { - if (unflushed > 0) { - flush = true; - } else { - // If there is no new data since the last fsync, - // don't perform the fsync, but do reset the timer. - getNextFlushTime(&ts_next_flush); - } - } - } - - if (flush) { - // sync (force flush) file to disk -#ifndef _WIN32 - fsync(fd_); -#endif - unflushed = 0; - getNextFlushTime(&ts_next_flush); - - // notify anybody waiting for flush completion - if (forced_flush) { - Guard g(mutex_); - forceFlush_ = false; - assert(enqueueBuffer_->isEmpty()); - assert(dequeueBuffer_->isEmpty()); - flushed_.notifyAll(); - } - } - } -} - -void TFileTransport::flush() { - // file must be open for writing for any flushing to take place - if (!writerThread_.get()) { - return; - } - // wait for flush to take place - Guard g(mutex_); - - // Indicate that we are requesting a flush - forceFlush_ = true; - // Wake up the writer thread so it will perform the flush immediately - notEmpty_.notify(); - - while (forceFlush_) { - flushed_.wait(); - } -} - - -uint32_t TFileTransport::readAll(uint8_t* buf, uint32_t len) { - uint32_t have = 0; - uint32_t get = 0; - - while (have < len) { - get = read(buf+have, len-have); - if (get <= 0) { - throw TEOFException(); - } - have += get; - } - - return have; -} - -bool TFileTransport::peek() { - // check if there is an event ready to be read - if (!currentEvent_) { - currentEvent_ = readEvent(); - } - - // did not manage to read an event from the file. This could have happened - // if the timeout expired or there was some other error - if (!currentEvent_) { - return false; - } - - // check if there is anything to read - return (currentEvent_->eventSize_ - currentEvent_->eventBuffPos_) > 0; -} - -uint32_t TFileTransport::read(uint8_t* buf, uint32_t len) { - // check if there an event is ready to be read - if (!currentEvent_) { - currentEvent_ = readEvent(); - } - - // did not manage to read an event from the file. This could have happened - // if the timeout expired or there was some other error - if (!currentEvent_) { - return 0; - } - - // read as much of the current event as possible - int32_t remaining = currentEvent_->eventSize_ - currentEvent_->eventBuffPos_; - if (remaining <= (int32_t)len) { - // copy over anything thats remaining - if (remaining > 0) { - memcpy(buf, - currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, - remaining); - } - delete(currentEvent_); - currentEvent_ = NULL; - return remaining; - } - - // read as much as possible - memcpy(buf, currentEvent_->eventBuff_ + currentEvent_->eventBuffPos_, len); - currentEvent_->eventBuffPos_ += len; - return len; -} - -// note caller is responsible for freeing returned events -eventInfo* TFileTransport::readEvent() { - int readTries = 0; - - if (!readBuff_) { - readBuff_ = new uint8_t[readBuffSize_]; - } - - while (1) { - // read from the file if read buffer is exhausted - if (readState_.bufferPtr_ == readState_.bufferLen_) { - // advance the offset pointer - offset_ += readState_.bufferLen_; - readState_.bufferLen_ = static_cast<uint32_t>(::read(fd_, readBuff_, readBuffSize_)); - // if (readState_.bufferLen_) { - // T_DEBUG_L(1, "Amount read: %u (offset: %lu)", readState_.bufferLen_, offset_); - // } - readState_.bufferPtr_ = 0; - readState_.lastDispatchPtr_ = 0; - - // read error - if (readState_.bufferLen_ == -1) { - readState_.resetAllValues(); - GlobalOutput("TFileTransport: error while reading from file"); - throw TTransportException("TFileTransport: error while reading from file"); - } else if (readState_.bufferLen_ == 0) { // EOF - // wait indefinitely if there is no timeout - if (readTimeout_ == TAIL_READ_TIMEOUT) { - THRIFT_SLEEP_USEC(eofSleepTime_); - continue; - } else if (readTimeout_ == NO_TAIL_READ_TIMEOUT) { - // reset state - readState_.resetState(0); - return NULL; - } else if (readTimeout_ > 0) { - // timeout already expired once - if (readTries > 0) { - readState_.resetState(0); - return NULL; - } else { - THRIFT_SLEEP_USEC(readTimeout_ * 1000); - readTries++; - continue; - } - } - } - } - - readTries = 0; - - // attempt to read an event from the buffer - while(readState_.bufferPtr_ < readState_.bufferLen_) { - if (readState_.readingSize_) { - if(readState_.eventSizeBuffPos_ == 0) { - if ( (offset_ + readState_.bufferPtr_)/chunkSize_ != - ((offset_ + readState_.bufferPtr_ + 3)/chunkSize_)) { - // skip one byte towards chunk boundary - // T_DEBUG_L(1, "Skipping a byte"); - readState_.bufferPtr_++; - continue; - } - } - - readState_.eventSizeBuff_[readState_.eventSizeBuffPos_++] = - readBuff_[readState_.bufferPtr_++]; - - if (readState_.eventSizeBuffPos_ == 4) { - if (readState_.getEventSize() == 0) { - // 0 length event indicates padding - // T_DEBUG_L(1, "Got padding"); - readState_.resetState(readState_.lastDispatchPtr_); - continue; - } - // got a valid event - readState_.readingSize_ = false; - if (readState_.event_) { - delete(readState_.event_); - } - readState_.event_ = new eventInfo(); - readState_.event_->eventSize_ = readState_.getEventSize(); - - // check if the event is corrupted and perform recovery if required - if (isEventCorrupted()) { - performRecovery(); - // start from the top - break; - } - } - } else { - if (!readState_.event_->eventBuff_) { - readState_.event_->eventBuff_ = new uint8_t[readState_.event_->eventSize_]; - readState_.event_->eventBuffPos_ = 0; - } - // take either the entire event or the remaining bytes in the buffer - int reclaimBuffer = min((uint32_t)(readState_.bufferLen_ - readState_.bufferPtr_), - readState_.event_->eventSize_ - readState_.event_->eventBuffPos_); - - // copy data from read buffer into event buffer - memcpy(readState_.event_->eventBuff_ + readState_.event_->eventBuffPos_, - readBuff_ + readState_.bufferPtr_, - reclaimBuffer); - - // increment position ptrs - readState_.event_->eventBuffPos_ += reclaimBuffer; - readState_.bufferPtr_ += reclaimBuffer; - - // check if the event has been read in full - if (readState_.event_->eventBuffPos_ == readState_.event_->eventSize_) { - // set the completed event to the current event - eventInfo* completeEvent = readState_.event_; - completeEvent->eventBuffPos_ = 0; - - readState_.event_ = NULL; - readState_.resetState(readState_.bufferPtr_); - - // exit criteria - return completeEvent; - } - } - } - - } -} - -bool TFileTransport::isEventCorrupted() { - // an error is triggered if: - if ( (maxEventSize_ > 0) && (readState_.event_->eventSize_ > maxEventSize_)) { - // 1. Event size is larger than user-speficied max-event size - T_ERROR("Read corrupt event. Event size(%u) greater than max event size (%u)", - readState_.event_->eventSize_, maxEventSize_); - return true; - } else if (readState_.event_->eventSize_ > chunkSize_) { - // 2. Event size is larger than chunk size - T_ERROR("Read corrupt event. Event size(%u) greater than chunk size (%u)", - readState_.event_->eventSize_, chunkSize_); - return true; - } else if( ((offset_ + readState_.bufferPtr_ - 4)/chunkSize_) != - ((offset_ + readState_.bufferPtr_ + readState_.event_->eventSize_ - 1)/chunkSize_) ) { - // 3. size indicates that event crosses chunk boundary - T_ERROR("Read corrupt event. Event crosses chunk boundary. Event size:%u Offset:%lu", - readState_.event_->eventSize_, - static_cast<unsigned long>(offset_ + readState_.bufferPtr_ + 4)); - - return true; - } - - return false; -} - -void TFileTransport::performRecovery() { - // perform some kickass recovery - uint32_t curChunk = getCurChunk(); - if (lastBadChunk_ == curChunk) { - numCorruptedEventsInChunk_++; - } else { - lastBadChunk_ = curChunk; - numCorruptedEventsInChunk_ = 1; - } - - if (numCorruptedEventsInChunk_ < maxCorruptedEvents_) { - // maybe there was an error in reading the file from disk - // seek to the beginning of chunk and try again - seekToChunk(curChunk); - } else { - - // just skip ahead to the next chunk if we not already at the last chunk - if (curChunk != (getNumChunks() - 1)) { - seekToChunk(curChunk + 1); - } else if (readTimeout_ == TAIL_READ_TIMEOUT) { - // if tailing the file, wait until there is enough data to start - // the next chunk - while(curChunk == (getNumChunks() - 1)) { - THRIFT_SLEEP_USEC(DEFAULT_CORRUPTED_SLEEP_TIME_US); - } - seekToChunk(curChunk + 1); - } else { - // pretty hosed at this stage, rewind the file back to the last successful - // point and punt on the error - readState_.resetState(readState_.lastDispatchPtr_); - currentEvent_ = NULL; - char errorMsg[1024]; - sprintf(errorMsg, "TFileTransport: log file corrupted at offset: %lu", - static_cast<unsigned long>(offset_ + readState_.lastDispatchPtr_)); - - GlobalOutput(errorMsg); - throw TTransportException(errorMsg); - } - } - -} - -void TFileTransport::seekToChunk(int32_t chunk) { - if (fd_ <= 0) { - throw TTransportException("File not open"); - } - - int32_t numChunks = getNumChunks(); - - // file is empty, seeking to chunk is pointless - if (numChunks == 0) { - return; - } - - // negative indicates reverse seek (from the end) - if (chunk < 0) { - chunk += numChunks; - } - - // too large a value for reverse seek, just seek to beginning - if (chunk < 0) { - T_DEBUG("%s", "Incorrect value for reverse seek. Seeking to beginning..."); - chunk = 0; - } - - // cannot seek past EOF - bool seekToEnd = false; - off_t minEndOffset = 0; - if (chunk >= numChunks) { - T_DEBUG("%s", "Trying to seek past EOF. Seeking to EOF instead..."); - seekToEnd = true; - chunk = numChunks - 1; - // this is the min offset to process events till - minEndOffset = lseek(fd_, 0, SEEK_END); - } - - off_t newOffset = off_t(chunk) * chunkSize_; - offset_ = lseek(fd_, newOffset, SEEK_SET); - readState_.resetAllValues(); - currentEvent_ = NULL; - if (offset_ == -1) { - GlobalOutput("TFileTransport: lseek error in seekToChunk"); - throw TTransportException("TFileTransport: lseek error in seekToChunk"); - } - - // seek to EOF if user wanted to go to last chunk - if (seekToEnd) { - uint32_t oldReadTimeout = getReadTimeout(); - setReadTimeout(NO_TAIL_READ_TIMEOUT); - // keep on reading unti the last event at point of seekChunk call - boost::scoped_ptr<eventInfo> event; - while ((offset_ + readState_.bufferPtr_) < minEndOffset) { - event.reset(readEvent()); - if (event.get() == NULL) { - break; - } - } - setReadTimeout(oldReadTimeout); - } - -} - -void TFileTransport::seekToEnd() { - seekToChunk(getNumChunks()); -} - -uint32_t TFileTransport::getNumChunks() { - if (fd_ <= 0) { - return 0; - } - - struct stat f_info; - int rv = fstat(fd_, &f_info); - - if (rv < 0) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - throw TTransportException(TTransportException::UNKNOWN, - "TFileTransport::getNumChunks() (fstat)", - errno_copy); - } - - if (f_info.st_size > 0) { - size_t numChunks = ((f_info.st_size)/chunkSize_) + 1; - if (numChunks > (std::numeric_limits<uint32_t>::max)()) - throw TTransportException("Too many chunks"); - return static_cast<uint32_t>(numChunks); - } - - // empty file has no chunks - return 0; -} - -uint32_t TFileTransport::getCurChunk() { - return static_cast<uint32_t>(offset_/chunkSize_); -} - -// Utility Functions -void TFileTransport::openLogFile() { -#ifndef _WIN32 - mode_t mode = readOnly_ ? S_IRUSR | S_IRGRP | S_IROTH : S_IRUSR | S_IWUSR| S_IRGRP | S_IROTH; - int flags = readOnly_ ? O_RDONLY : O_RDWR | O_CREAT | O_APPEND; - fd_ = ::open(filename_.c_str(), flags, mode); -#else - int mode = readOnly_ ? _S_IREAD : _S_IREAD | _S_IWRITE; - int flags = readOnly_ ? _O_RDONLY : _O_RDWR | _O_CREAT | _O_APPEND; - fd_ = ::_open(filename_.c_str(), flags, mode); -#endif - offset_ = 0; - - // make sure open call was successful - if(fd_ == -1) { - int errno_copy = THRIFT_GET_SOCKET_ERROR; - GlobalOutput.perror("TFileTransport: openLogFile() ::open() file: " + filename_, errno_copy); - throw TTransportException(TTransportException::NOT_OPEN, filename_, errno_copy); - } - -} - -void TFileTransport::getNextFlushTime(struct timeval* ts_next_flush) { - THRIFT_GETTIMEOFDAY(ts_next_flush, NULL); - - ts_next_flush->tv_usec += flushMaxUs_; - if (ts_next_flush->tv_usec > 1000000) { - long extra_secs = ts_next_flush->tv_usec / 1000000; - ts_next_flush->tv_usec %= 1000000; - ts_next_flush->tv_sec += extra_secs; - } -} - -TFileTransportBuffer::TFileTransportBuffer(uint32_t size) - : bufferMode_(WRITE) - , writePoint_(0) - , readPoint_(0) - , size_(size) -{ - buffer_ = new eventInfo*[size]; -} - -TFileTransportBuffer::~TFileTransportBuffer() { - if (buffer_) { - for (uint32_t i = 0; i < writePoint_; i++) { - delete buffer_[i]; - } - delete[] buffer_; - buffer_ = NULL; - } -} - -bool TFileTransportBuffer::addEvent(eventInfo *event) { - if (bufferMode_ == READ) { - GlobalOutput("Trying to write to a buffer in read mode"); - } - if (writePoint_ < size_) { - buffer_[writePoint_++] = event; - return true; - } else { - // buffer is full - return false; - } -} - -eventInfo* TFileTransportBuffer::getNext() { - if (bufferMode_ == WRITE) { - bufferMode_ = READ; - } - if (readPoint_ < writePoint_) { - return buffer_[readPoint_++]; - } else { - // no more entries - return NULL; - } -} - -void TFileTransportBuffer::reset() { - if (bufferMode_ == WRITE || writePoint_ > readPoint_) { - T_DEBUG("%s", "Resetting a buffer with unread entries"); - } - // Clean up the old entries - for (uint32_t i = 0; i < writePoint_; i++) { - delete buffer_[i]; - } - bufferMode_ = WRITE; - writePoint_ = 0; - readPoint_ = 0; -} - -bool TFileTransportBuffer::isFull() { - return writePoint_ == size_; -} - -bool TFileTransportBuffer::isEmpty() { - return writePoint_ == 0; -} - -TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor, - shared_ptr<TProtocolFactory> protocolFactory, - shared_ptr<TFileReaderTransport> inputTransport): - processor_(processor), - inputProtocolFactory_(protocolFactory), - outputProtocolFactory_(protocolFactory), - inputTransport_(inputTransport) { - - // default the output transport to a null transport (common case) - outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport()); -} - -TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor, - shared_ptr<TProtocolFactory> inputProtocolFactory, - shared_ptr<TProtocolFactory> outputProtocolFactory, - shared_ptr<TFileReaderTransport> inputTransport): - processor_(processor), - inputProtocolFactory_(inputProtocolFactory), - outputProtocolFactory_(outputProtocolFactory), - inputTransport_(inputTransport) { - - // default the output transport to a null transport (common case) - outputTransport_ = shared_ptr<TNullTransport>(new TNullTransport()); -} - -TFileProcessor::TFileProcessor(shared_ptr<TProcessor> processor, - shared_ptr<TProtocolFactory> protocolFactory, - shared_ptr<TFileReaderTransport> inputTransport, - shared_ptr<TTransport> outputTransport): - processor_(processor), - inputProtocolFactory_(protocolFactory), - outputProtocolFactory_(protocolFactory), - inputTransport_(inputTransport), - outputTransport_(outputTransport) {} - -void TFileProcessor::process(uint32_t numEvents, bool tail) { - shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_); - shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_); - - // set the read timeout to 0 if tailing is required - int32_t oldReadTimeout = inputTransport_->getReadTimeout(); - if (tail) { - // save old read timeout so it can be restored - inputTransport_->setReadTimeout(TFileTransport::TAIL_READ_TIMEOUT); - } - - uint32_t numProcessed = 0; - while(1) { - // bad form to use exceptions for flow control but there is really - // no other way around it - try { - processor_->process(inputProtocol, outputProtocol, NULL); - numProcessed++; - if ( (numEvents > 0) && (numProcessed == numEvents)) { - return; - } - } catch (TEOFException&) { - if (!tail) { - break; - } - } catch (TException &te) { - cerr << te.what() << endl; - break; - } - } - - // restore old read timeout - if (tail) { - inputTransport_->setReadTimeout(oldReadTimeout); - } - -} - -void TFileProcessor::processChunk() { - shared_ptr<TProtocol> inputProtocol = inputProtocolFactory_->getProtocol(inputTransport_); - shared_ptr<TProtocol> outputProtocol = outputProtocolFactory_->getProtocol(outputTransport_); - - uint32_t curChunk = inputTransport_->getCurChunk(); - - while(1) { - // bad form to use exceptions for flow control but there is really - // no other way around it - try { - processor_->process(inputProtocol, outputProtocol, NULL); - if (curChunk != inputTransport_->getCurChunk()) { - break; - } - } catch (TEOFException&) { - break; - } catch (TException &te) { - cerr << te.what() << endl; - break; - } - } -} - -}}} // apache::thrift::transport http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.h deleted file mode 100644 index 75941cf..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/TFileTransport.h +++ /dev/null @@ -1,474 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_ -#define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1 - -#include <thrift/transport/TTransport.h> -#include <thrift/Thrift.h> -#include <thrift/TProcessor.h> - -#include <string> -#include <stdio.h> - -#include <boost/scoped_ptr.hpp> -#include <boost/shared_ptr.hpp> - -#include <thrift/concurrency/Mutex.h> -#include <thrift/concurrency/Monitor.h> -#include <thrift/concurrency/PlatformThreadFactory.h> -#include <thrift/concurrency/Thread.h> - -namespace apache { namespace thrift { namespace transport { - -using apache::thrift::TProcessor; -using apache::thrift::protocol::TProtocolFactory; -using apache::thrift::concurrency::Mutex; -using apache::thrift::concurrency::Monitor; - -// Data pertaining to a single event -typedef struct eventInfo { - uint8_t* eventBuff_; - uint32_t eventSize_; - uint32_t eventBuffPos_; - - eventInfo():eventBuff_(NULL), eventSize_(0), eventBuffPos_(0){}; - ~eventInfo() { - if (eventBuff_) { - delete[] eventBuff_; - } - } -} eventInfo; - -// information about current read state -typedef struct readState { - eventInfo* event_; - - // keep track of event size - uint8_t eventSizeBuff_[4]; - uint8_t eventSizeBuffPos_; - bool readingSize_; - - // read buffer variables - int32_t bufferPtr_; - int32_t bufferLen_; - - // last successful dispatch point - int32_t lastDispatchPtr_; - - void resetState(uint32_t lastDispatchPtr) { - readingSize_ = true; - eventSizeBuffPos_ = 0; - lastDispatchPtr_ = lastDispatchPtr; - } - - void resetAllValues() { - resetState(0); - bufferPtr_ = 0; - bufferLen_ = 0; - if (event_) { - delete(event_); - } - event_ = 0; - } - - inline uint32_t getEventSize() { - const void *buffer=reinterpret_cast<const void *>(eventSizeBuff_); - return *reinterpret_cast<const uint32_t *>(buffer); - } - - readState() { - event_ = 0; - resetAllValues(); - } - - ~readState() { - if (event_) { - delete(event_); - } - } - -} readState; - -/** - * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events - * to be written to disk. Should be used in the following way: - * 1) Buffer created - * 2) Buffer written to (addEvent) - * 3) Buffer read from (getNext) - * 4) Buffer reset (reset) - * 5) Go back to 2, or destroy buffer - * - * The buffer should never be written to after it is read from, unless it is reset first. - * Note: The above rules are enforced mainly for debugging its sole client TFileTransport - * which uses the buffer in this way. - * - */ -class TFileTransportBuffer { - public: - TFileTransportBuffer(uint32_t size); - ~TFileTransportBuffer(); - - bool addEvent(eventInfo *event); - eventInfo* getNext(); - void reset(); - bool isFull(); - bool isEmpty(); - - private: - TFileTransportBuffer(); // should not be used - - enum mode { - WRITE, - READ - }; - mode bufferMode_; - - uint32_t writePoint_; - uint32_t readPoint_; - uint32_t size_; - eventInfo** buffer_; -}; - -/** - * Abstract interface for transports used to read files - */ -class TFileReaderTransport : virtual public TTransport { - public: - virtual int32_t getReadTimeout() = 0; - virtual void setReadTimeout(int32_t readTimeout) = 0; - - virtual uint32_t getNumChunks() = 0; - virtual uint32_t getCurChunk() = 0; - virtual void seekToChunk(int32_t chunk) = 0; - virtual void seekToEnd() = 0; -}; - -/** - * Abstract interface for transports used to write files - */ -class TFileWriterTransport : virtual public TTransport { - public: - virtual uint32_t getChunkSize() = 0; - virtual void setChunkSize(uint32_t chunkSize) = 0; -}; - -/** - * File implementation of a transport. Reads and writes are done to a - * file on disk. - * - */ -class TFileTransport : public TFileReaderTransport, - public TFileWriterTransport { - public: - TFileTransport(std::string path, bool readOnly=false); - ~TFileTransport(); - - // TODO: what is the correct behaviour for this? - // the log file is generally always open - bool isOpen() { - return true; - } - - void write(const uint8_t* buf, uint32_t len); - void flush(); - - uint32_t readAll(uint8_t* buf, uint32_t len); - uint32_t read(uint8_t* buf, uint32_t len); - bool peek(); - - // log-file specific functions - void seekToChunk(int32_t chunk); - void seekToEnd(); - uint32_t getNumChunks(); - uint32_t getCurChunk(); - - // for changing the output file - void resetOutputFile(int fd, std::string filename, off_t offset); - - // Setter/Getter functions for user-controllable options - void setReadBuffSize(uint32_t readBuffSize) { - if (readBuffSize) { - readBuffSize_ = readBuffSize; - } - } - uint32_t getReadBuffSize() { - return readBuffSize_; - } - - static const int32_t TAIL_READ_TIMEOUT = -1; - static const int32_t NO_TAIL_READ_TIMEOUT = 0; - void setReadTimeout(int32_t readTimeout) { - readTimeout_ = readTimeout; - } - int32_t getReadTimeout() { - return readTimeout_; - } - - void setChunkSize(uint32_t chunkSize) { - if (chunkSize) { - chunkSize_ = chunkSize; - } - } - uint32_t getChunkSize() { - return chunkSize_; - } - - void setEventBufferSize(uint32_t bufferSize) { - if (bufferAndThreadInitialized_) { - GlobalOutput("Cannot change the buffer size after writer thread started"); - return; - } - eventBufferSize_ = bufferSize; - } - - uint32_t getEventBufferSize() { - return eventBufferSize_; - } - - void setFlushMaxUs(uint32_t flushMaxUs) { - if (flushMaxUs) { - flushMaxUs_ = flushMaxUs; - } - } - uint32_t getFlushMaxUs() { - return flushMaxUs_; - } - - void setFlushMaxBytes(uint32_t flushMaxBytes) { - if (flushMaxBytes) { - flushMaxBytes_ = flushMaxBytes; - } - } - uint32_t getFlushMaxBytes() { - return flushMaxBytes_; - } - - void setMaxEventSize(uint32_t maxEventSize) { - maxEventSize_ = maxEventSize; - } - uint32_t getMaxEventSize() { - return maxEventSize_; - } - - void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) { - maxCorruptedEvents_ = maxCorruptedEvents; - } - uint32_t getMaxCorruptedEvents() { - return maxCorruptedEvents_; - } - - void setEofSleepTimeUs(uint32_t eofSleepTime) { - if (eofSleepTime) { - eofSleepTime_ = eofSleepTime; - } - } - uint32_t getEofSleepTimeUs() { - return eofSleepTime_; - } - - /* - * Override TTransport *_virt() functions to invoke our implementations. - * We cannot use TVirtualTransport to provide these, since we need to inherit - * virtually from TTransport. - */ - virtual uint32_t read_virt(uint8_t* buf, uint32_t len) { - return this->read(buf, len); - } - virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) { - return this->readAll(buf, len); - } - virtual void write_virt(const uint8_t* buf, uint32_t len) { - this->write(buf, len); - } - - private: - // helper functions for writing to a file - void enqueueEvent(const uint8_t* buf, uint32_t eventLen); - bool swapEventBuffers(struct timeval* deadline); - bool initBufferAndWriteThread(); - - // control for writer thread - static void* startWriterThread(void* ptr) { - static_cast<TFileTransport*>(ptr)->writerThread(); - return NULL; - } - void writerThread(); - - // helper functions for reading from a file - eventInfo* readEvent(); - - // event corruption-related functions - bool isEventCorrupted(); - void performRecovery(); - - // Utility functions - void openLogFile(); - void getNextFlushTime(struct timeval* ts_next_flush); - - // Class variables - readState readState_; - uint8_t* readBuff_; - eventInfo* currentEvent_; - - uint32_t readBuffSize_; - static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024; - - int32_t readTimeout_; - static const int32_t DEFAULT_READ_TIMEOUT_MS = 200; - - // size of chunks that file will be split up into - uint32_t chunkSize_; - static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024; - - // size of event buffers - uint32_t eventBufferSize_; - static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000; - - // max number of microseconds that can pass without flushing - uint32_t flushMaxUs_; - static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000; - - // max number of bytes that can be written without flushing - uint32_t flushMaxBytes_; - static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024; - - // max event size - uint32_t maxEventSize_; - static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0; - - // max number of corrupted events per chunk - uint32_t maxCorruptedEvents_; - static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0; - - // sleep duration when EOF is hit - uint32_t eofSleepTime_; - static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000; - - // sleep duration when a corrupted event is encountered - uint32_t corruptedEventSleepTime_; - static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000; - - // sleep duration in seconds when an IO error is encountered in the writer thread - uint32_t writerThreadIOErrorSleepTime_; - static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000; - - // writer thread - apache::thrift::concurrency::PlatformThreadFactory threadFactory_; - boost::shared_ptr<apache::thrift::concurrency::Thread> writerThread_; - - // buffers to hold data before it is flushed. Each element of the buffer stores a msg that - // needs to be written to the file. The buffers are swapped by the writer thread. - TFileTransportBuffer *dequeueBuffer_; - TFileTransportBuffer *enqueueBuffer_; - - // conditions used to block when the buffer is full or empty - Monitor notFull_, notEmpty_; - volatile bool closing_; - - // To keep track of whether the buffer has been flushed - Monitor flushed_; - volatile bool forceFlush_; - - // Mutex that is grabbed when enqueueing and swapping the read/write buffers - Mutex mutex_; - - // File information - std::string filename_; - int fd_; - - // Whether the writer thread and buffers have been initialized - bool bufferAndThreadInitialized_; - - // Offset within the file - off_t offset_; - - // event corruption information - uint32_t lastBadChunk_; - uint32_t numCorruptedEventsInChunk_; - - bool readOnly_; -}; - -// Exception thrown when EOF is hit -class TEOFException : public TTransportException { - public: - TEOFException(): - TTransportException(TTransportException::END_OF_FILE) {}; -}; - - -// wrapper class to process events from a file containing thrift events -class TFileProcessor { - public: - /** - * Constructor that defaults output transport to null transport - * - * @param processor processes log-file events - * @param protocolFactory protocol factory - * @param inputTransport file transport - */ - TFileProcessor(boost::shared_ptr<TProcessor> processor, - boost::shared_ptr<TProtocolFactory> protocolFactory, - boost::shared_ptr<TFileReaderTransport> inputTransport); - - TFileProcessor(boost::shared_ptr<TProcessor> processor, - boost::shared_ptr<TProtocolFactory> inputProtocolFactory, - boost::shared_ptr<TProtocolFactory> outputProtocolFactory, - boost::shared_ptr<TFileReaderTransport> inputTransport); - - /** - * Constructor - * - * @param processor processes log-file events - * @param protocolFactory protocol factory - * @param inputTransport input file transport - * @param output output transport - */ - TFileProcessor(boost::shared_ptr<TProcessor> processor, - boost::shared_ptr<TProtocolFactory> protocolFactory, - boost::shared_ptr<TFileReaderTransport> inputTransport, - boost::shared_ptr<TTransport> outputTransport); - - /** - * processes events from the file - * - * @param numEvents number of events to process (0 for unlimited) - * @param tail tails the file if true - */ - void process(uint32_t numEvents, bool tail); - - /** - * process events until the end of the chunk - * - */ - void processChunk(); - - private: - boost::shared_ptr<TProcessor> processor_; - boost::shared_ptr<TProtocolFactory> inputProtocolFactory_; - boost::shared_ptr<TProtocolFactory> outputProtocolFactory_; - boost::shared_ptr<TFileReaderTransport> inputTransport_; - boost::shared_ptr<TTransport> outputTransport_; -}; - - -}}} // apache::thrift::transport - -#endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_ http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.cpp deleted file mode 100644 index cb94d5e..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.cpp +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <limits> -#include <cstdlib> -#include <sstream> -#include <boost/algorithm/string.hpp> - -#include <thrift/transport/THttpClient.h> -#include <thrift/transport/TSocket.h> - -namespace apache { namespace thrift { namespace transport { - -using namespace std; - -THttpClient::THttpClient(boost::shared_ptr<TTransport> transport, std::string host, std::string path) : - THttpTransport(transport), host_(host), path_(path) { -} - -THttpClient::THttpClient(string host, int port, string path) : - THttpTransport(boost::shared_ptr<TTransport>(new TSocket(host, port))), host_(host), path_(path) { -} - -THttpClient::~THttpClient() {} - -void THttpClient::parseHeader(char* header) { - char* colon = strchr(header, ':'); - if (colon == NULL) { - return; - } - char* value = colon+1; - - if (boost::istarts_with(header, "Transfer-Encoding")) { - if (boost::iends_with(value, "chunked")) { - chunked_ = true; - } - } else if (boost::istarts_with(header, "Content-Length")) { - chunked_ = false; - contentLength_ = atoi(value); - } -} - -bool THttpClient::parseStatusLine(char* status) { - char* http = status; - - char* code = strchr(http, ' '); - if (code == NULL) { - throw TTransportException(string("Bad Status: ") + status); - } - - *code = '\0'; - while (*(code++) == ' ') {}; - - char* msg = strchr(code, ' '); - if (msg == NULL) { - throw TTransportException(string("Bad Status: ") + status); - } - *msg = '\0'; - - if (strcmp(code, "200") == 0) { - // HTTP 200 = OK, we got the response - return true; - } else if (strcmp(code, "100") == 0) { - // HTTP 100 = continue, just keep reading - return false; - } else { - throw TTransportException(string("Bad Status: ") + status); - } -} - -void THttpClient::flush() { - // Fetch the contents of the write buffer - uint8_t* buf; - uint32_t len; - writeBuffer_.getBuffer(&buf, &len); - - // Construct the HTTP header - std::ostringstream h; - h << - "POST " << path_ << " HTTP/1.1" << CRLF << - "Host: " << host_ << CRLF << - "Content-Type: application/x-thrift" << CRLF << - "Content-Length: " << len << CRLF << - "Accept: application/x-thrift" << CRLF << - "User-Agent: Thrift/" << VERSION << " (C++/THttpClient)" << CRLF << - CRLF; - string header = h.str(); - - if(header.size() > (std::numeric_limits<uint32_t>::max)()) - throw TTransportException("Header too big"); - // Write the header, then the data, then flush - transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size())); - transport_->write(buf, len); - transport_->flush(); - - // Reset the buffer and header variables - writeBuffer_.resetBuffer(); - readHeaders_ = true; -} - -}}} // apache::thrift::transport http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.h ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.h b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.h deleted file mode 100644 index 0898b11..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpClient.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_ -#define _THRIFT_TRANSPORT_THTTPCLIENT_H_ 1 - -#include <thrift/transport/THttpTransport.h> - -namespace apache { namespace thrift { namespace transport { - -class THttpClient : public THttpTransport { - public: - THttpClient(boost::shared_ptr<TTransport> transport, std::string host, std::string path=""); - - THttpClient(std::string host, int port, std::string path=""); - - virtual ~THttpClient(); - - virtual void flush(); - - protected: - - std::string host_; - std::string path_; - - virtual void parseHeader(char* header); - virtual bool parseStatusLine(char* status); - -}; - -}}} // apache::thrift::transport - -#endif // #ifndef _THRIFT_TRANSPORT_THTTPCLIENT_H_ http://git-wip-us.apache.org/repos/asf/airavata-php-gateway/blob/d55608f1/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpServer.cpp ---------------------------------------------------------------------- diff --git a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpServer.cpp b/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpServer.cpp deleted file mode 100644 index 1135270..0000000 --- a/airavata-api/airavata-client-sdks/airavata-cpp-sdk/src/main/resources/lib/thrift/transport/THttpServer.cpp +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <cstdlib> -#include <sstream> -#include <iostream> - -#include <thrift/transport/THttpServer.h> -#include <thrift/transport/TSocket.h> - -namespace apache { namespace thrift { namespace transport { - -using namespace std; - -THttpServer::THttpServer(boost::shared_ptr<TTransport> transport) : - THttpTransport(transport) { -} - -THttpServer::~THttpServer() {} - -void THttpServer::parseHeader(char* header) { - char* colon = strchr(header, ':'); - if (colon == NULL) { - return; - } - size_t sz = colon - header; - char* value = colon+1; - - if (strncmp(header, "Transfer-Encoding", sz) == 0) { - if (strstr(value, "chunked") != NULL) { - chunked_ = true; - } - } else if (strncmp(header, "Content-Length", sz) == 0) { - chunked_ = false; - contentLength_ = atoi(value); - } -} - -bool THttpServer::parseStatusLine(char* status) { - char* method = status; - - char* path = strchr(method, ' '); - if (path == NULL) { - throw TTransportException(string("Bad Status: ") + status); - } - - *path = '\0'; - while (*(++path) == ' ') {}; - - char* http = strchr(path, ' '); - if (http == NULL) { - throw TTransportException(string("Bad Status: ") + status); - } - *http = '\0'; - - if (strcmp(method, "POST") == 0) { - // POST method ok, looking for content. - return true; - } - else if (strcmp(method, "OPTIONS") == 0) { - // preflight OPTIONS method, we don't need further content. - // how to graciously close connection? - uint8_t* buf; - uint32_t len; - writeBuffer_.getBuffer(&buf, &len); - - // Construct the HTTP header - std::ostringstream h; - h << - "HTTP/1.1 200 OK" << CRLF << - "Date: " << getTimeRFC1123() << CRLF << - "Access-Control-Allow-Origin: *" << CRLF << - "Access-Control-Allow-Methods: POST, OPTIONS" << CRLF << - "Access-Control-Allow-Headers: Content-Type" << CRLF << - CRLF; - string header = h.str(); - - // Write the header, then the data, then flush - transport_->write((const uint8_t*)header.c_str(), header.size()); - transport_->write(buf, len); - transport_->flush(); - - // Reset the buffer and header variables - writeBuffer_.resetBuffer(); - readHeaders_ = true; - return true; - } - throw TTransportException(string("Bad Status (unsupported method): ") + status); -} - -void THttpServer::flush() { - // Fetch the contents of the write buffer - uint8_t* buf; - uint32_t len; - writeBuffer_.getBuffer(&buf, &len); - - // Construct the HTTP header - std::ostringstream h; - h << - "HTTP/1.1 200 OK" << CRLF << - "Date: " << getTimeRFC1123() << CRLF << - "Server: Thrift/" << VERSION << CRLF << - "Access-Control-Allow-Origin: *" << CRLF << - "Content-Type: application/x-thrift" << CRLF << - "Content-Length: " << len << CRLF << - "Connection: Keep-Alive" << CRLF << - CRLF; - string header = h.str(); - - // Write the header, then the data, then flush - // cast should be fine, because none of "header" is under attacker control - transport_->write((const uint8_t*)header.c_str(), static_cast<uint32_t>(header.size())); - transport_->write(buf, len); - transport_->flush(); - - // Reset the buffer and header variables - writeBuffer_.resetBuffer(); - readHeaders_ = true; -} - -std::string THttpServer::getTimeRFC1123() -{ - static const char* Days[] = {"Sun","Mon","Tue","Wed","Thu","Fri","Sat"}; - static const char* Months[] = {"Jan","Feb","Mar", "Apr", "May", "Jun", "Jul","Aug", "Sep", "Oct","Nov","Dec"}; - char buff[128]; - time_t t = time(NULL); - tm* broken_t = gmtime(&t); - - sprintf(buff,"%s, %d %s %d %d:%d:%d GMT", - Days[broken_t->tm_wday], broken_t->tm_mday, Months[broken_t->tm_mon], - broken_t->tm_year + 1900, - broken_t->tm_hour,broken_t->tm_min,broken_t->tm_sec); - return std::string(buff); -} - -}}} // apache::thrift::transport
