http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/metadata.hpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/metadata.hpp b/contrib/native/client/src/clientlib/metadata.hpp new file mode 100644 index 0000000..0cc8987 --- /dev/null +++ b/contrib/native/client/src/clientlib/metadata.hpp @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef DRILL_METADATA_H +#define DRILL_METADATA_H + +#include <boost/ref.hpp> + +#include "drill/common.hpp" +#include "drill/drillClient.hpp" +#include "env.h" +#include "User.pb.h" + +namespace Drill { +class DrillClientImpl; + +namespace meta { + class DrillCatalogMetadata: public meta::CatalogMetadata { + public: + DrillCatalogMetadata(const ::exec::user::CatalogMetadata& metadata): + meta::CatalogMetadata(), + m_pMetadata(metadata){ + } + + bool hasCatalogName() const { return m_pMetadata.get().has_catalog_name(); } + const std::string& getCatalogName() const { return m_pMetadata.get().catalog_name(); } + + bool hasDescription() const { return m_pMetadata.get().has_description(); } + const std::string& getDescription() const { return m_pMetadata.get().description(); } + + bool hasConnect() const { return m_pMetadata.get().has_connect(); } + const std::string& getConnect() const { return m_pMetadata.get().connect(); } + + private: + boost::reference_wrapper<const ::exec::user::CatalogMetadata> m_pMetadata; + }; + + class DrillSchemaMetadata: public meta::SchemaMetadata { + public: + DrillSchemaMetadata(const ::exec::user::SchemaMetadata& metadata): + meta::SchemaMetadata(), + m_pMetadata(metadata){ + } + + bool hasCatalogName() const { return m_pMetadata.get().has_catalog_name(); } + const std::string& getCatalogName() const { return m_pMetadata.get().catalog_name(); } + + bool hasSchemaName() const { return m_pMetadata.get().has_schema_name(); } + const std::string& getSchemaName() const { return m_pMetadata.get().schema_name(); } + + bool hasOwnerName() const { return m_pMetadata.get().has_owner(); } + const std::string& getOwner() const { return m_pMetadata.get().owner(); } + + bool hasType() const { return m_pMetadata.get().has_type(); } + const std::string& getType() const { return m_pMetadata.get().type(); } + + bool hasMutable() const { return m_pMetadata.get().has_mutable_(); } + const std::string& getMutable() const { return m_pMetadata.get().mutable_(); } + + private: + boost::reference_wrapper<const ::exec::user::SchemaMetadata> m_pMetadata; + }; + + class DrillTableMetadata: public meta::TableMetadata { + public: + DrillTableMetadata(const ::exec::user::TableMetadata& metadata): + meta::TableMetadata(), + m_pMetadata(metadata){ + } + + bool hasCatalogName() const { return m_pMetadata.get().has_catalog_name(); } + const std::string& getCatalogName() const { return m_pMetadata.get().catalog_name(); } + + bool hasSchemaName() const { return m_pMetadata.get().has_schema_name(); } + const std::string& getSchemaName() const { return m_pMetadata.get().schema_name(); } + + bool hasTableName() const { return m_pMetadata.get().has_table_name(); } + const std::string& getTableName() const { return m_pMetadata.get().table_name(); } + + bool hasType() const { return m_pMetadata.get().has_type(); } + const std::string& getType() const { return m_pMetadata.get().type(); } + + private: + boost::reference_wrapper<const ::exec::user::TableMetadata> m_pMetadata; + }; + + class DrillColumnMetadata: public meta::ColumnMetadata { + public: + DrillColumnMetadata(const ::exec::user::ColumnMetadata& metadata): + meta::ColumnMetadata(), + m_pMetadata(metadata){ + } + + bool hasCatalogName() const { return m_pMetadata.get().has_catalog_name(); } + const std::string& getCatalogName() const { return m_pMetadata.get().catalog_name(); } + + bool hasSchemaName() const { return m_pMetadata.get().has_schema_name(); } + const std::string& getSchemaName() const { return m_pMetadata.get().schema_name(); } + + bool hasTableName() const { return m_pMetadata.get().has_table_name(); } + const std::string& getTableName() const { return m_pMetadata.get().table_name(); } + + bool hasColumnName() const { return m_pMetadata.get().has_column_name(); } + const std::string& getColumnName() const { return m_pMetadata.get().column_name(); } + + bool hasOrdinalPosition() const { return m_pMetadata.get().has_ordinal_position(); } + std::size_t getOrdinalPosition() const { return m_pMetadata.get().ordinal_position(); } + + bool hasDefaultValue() const { return m_pMetadata.get().has_default_value(); } + const std::string& getDefaultValue() const { return m_pMetadata.get().default_value(); } + + bool hasNullable() const { return m_pMetadata.get().has_is_nullable(); } + bool isNullable() const { return m_pMetadata.get().is_nullable(); } + + bool hasDataType() const { return m_pMetadata.get().has_data_type(); } + const std::string& getDataType() const { return m_pMetadata.get().data_type(); } + + bool hasColumnSize() const { return m_pMetadata.get().has_column_size(); } + std::size_t getColumnSize() const { return m_pMetadata.get().column_size(); } + + bool hasCharMaxLength() const { return m_pMetadata.get().has_char_max_length(); } + std::size_t getCharMaxLength() const { return m_pMetadata.get().char_max_length(); } + + bool hasCharOctetLength() const { return m_pMetadata.get().has_char_octet_length(); } + std::size_t getCharOctetLength() const { return m_pMetadata.get().char_octet_length(); } + + bool hasNumericPrecision() const { return m_pMetadata.get().has_numeric_precision(); } + int32_t getNumericPrecision() const { return m_pMetadata.get().numeric_precision(); } + + bool hasNumericRadix() const { return m_pMetadata.get().has_numeric_precision_radix(); } + int32_t getNumericRadix() const { return m_pMetadata.get().numeric_precision_radix(); } + + bool hasNumericScale() const { return m_pMetadata.get().has_numeric_scale(); } + int32_t getNumericScale() const { return m_pMetadata.get().numeric_scale(); } + + bool hasIntervalType() const { return m_pMetadata.get().has_interval_type(); } + const std::string& getIntervalType() const { return m_pMetadata.get().interval_type(); } + + bool hasIntervalPrecision() const { return m_pMetadata.get().has_interval_precision(); } + int32_t getIntervalPrecision() const { return m_pMetadata.get().interval_precision(); } + + private: + boost::reference_wrapper<const ::exec::user::ColumnMetadata> m_pMetadata; + }; + + class DrillMetadata: public Metadata { + public: + static const std::string s_connectorName; + static const std::string s_connectorVersion; + + static const std::string s_serverName; + static const std::string s_serverVersion; + + static const std::string s_catalogSeparator; + static const std::string s_catalogTerm; + + static const std::string s_identifierQuoteString; + static const std::vector<std::string> s_sqlKeywords; + static const std::vector<std::string> s_numericFunctions; + static const std::string s_schemaTerm; + static const std::string s_searchEscapeString; + static const std::string s_specialCharacters; + static const std::vector<std::string> s_stringFunctions; + static const std::vector<std::string> s_systemFunctions; + static const std::string s_tableTerm; + static const std::vector<std::string> s_dateTimeFunctions; + + DrillMetadata(DrillClientImpl& client): Metadata(), m_client(client) {} + ~DrillMetadata() {} + + DrillClientImpl& client() { return m_client; } + + const std::string& getConnectorName() const { return s_connectorName; }; + const std::string& getConnectorVersion() const { return s_connectorVersion; } + uint32_t getConnectorMajorVersion() const { return DRILL_VERSION_MAJOR; } + uint32_t getConnectorMinorVersion() const { return DRILL_VERSION_MINOR; } + uint32_t getConnectorPatchVersion() const { return DRILL_VERSION_PATCH; } + + const std::string& getServerName() const; + const std::string& getServerVersion() const; + uint32_t getServerMajorVersion() const; + uint32_t getServerMinorVersion() const; + uint32_t getServerPatchVersion() const; + + status_t getCatalogs(const std::string& catalogPattern, Metadata::pfnCatalogMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle); + status_t getSchemas(const std::string& catalogPattern, const std::string& schemaPattern, Metadata::pfnSchemaMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle); + status_t getTables(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& tablePattern, const std::vector<std::string>* tableTypes, Metadata::pfnTableMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle); + status_t getColumns(const std::string& catalogPattern, const std::string& schemaPattern, const std:: string& tablePattern, const std::string& columnPattern, Metadata::pfnColumnMetadataListener listener, void* listenerCtx, QueryHandle_t* qHandle); + + bool areAllTableSelectable() const { return false; } + bool isCatalogAtStart() const { return true; } + const std::string& getCatalogSeparator() const { return s_catalogSeparator; } + const std::string& getCatalogTerm() const { return s_catalogTerm; } + bool isColumnAliasingSupported() const { return true; } + bool isNullPlusNonNullNull() const { return true; } + bool isConvertSupported(common::MinorType from, common::MinorType to) const; + meta::CorrelationNamesSupport getCorrelationNames() const { return meta::CN_ANY_NAMES; } + bool isReadOnly() const { return false; } + meta::DateTimeLiteralSupport getDateTimeLiteralsSupport() const { + return DL_DATE + | DL_TIME + | DL_TIMESTAMP + | DL_INTERVAL_YEAR + | DL_INTERVAL_MONTH + | DL_INTERVAL_DAY + | DL_INTERVAL_HOUR + | DL_INTERVAL_MINUTE + | DL_INTERVAL_SECOND + | DL_INTERVAL_YEAR_TO_MONTH + | DL_INTERVAL_DAY_TO_HOUR + | DL_INTERVAL_DAY_TO_MINUTE + | DL_INTERVAL_DAY_TO_SECOND + | DL_INTERVAL_HOUR_TO_MINUTE + | DL_INTERVAL_HOUR_TO_SECOND + | DL_INTERVAL_MINUTE_TO_SECOND; + } + + meta::CollateSupport getCollateSupport() const { return meta::C_NONE; }// supported? + meta::GroupBySupport getGroupBySupport() const { return meta::GB_UNRELATED; } + meta::IdentifierCase getIdentifierCase() const { return meta::IC_STORES_UPPER; } // to check? + + const std::string& getIdentifierQuoteString() const { return s_identifierQuoteString; } + const std::vector<std::string>& getSQLKeywords() const { return s_sqlKeywords; } + bool isLikeEscapeClauseSupported() const { return true; } + std::size_t getMaxBinaryLiteralLength() const { return 0; } + std::size_t getMaxCatalogNameLength() const { return 0; } + std::size_t getMaxCharLiteralLength() const { return 0; } + std::size_t getMaxColumnNameLength() const { return 0; } + std::size_t getMaxColumnsInGroupBy() const { return 0; } + std::size_t getMaxColumnsInOrderBy() const { return 0; } + std::size_t getMaxColumnsInSelect() const { return 0; } + std::size_t getMaxCursorNameLength() const { return 0; } + std::size_t getMaxLogicalLobSize() const { return 0; } + std::size_t getMaxStatements() const { return 0; } + std::size_t getMaxRowSize() const { return 0; } + bool isBlobIncludedInMaxRowSize() const { return true; } + std::size_t getMaxSchemaNameLength() const { return 0; } + std::size_t getMaxStatementLength() const { return 0; } + std::size_t getMaxTableNameLength() const { return 0; } + std::size_t getMaxTablesInSelect() const { return 0; } + std::size_t getMaxUserNameLength() const { return 0; } + meta::NullCollation getNullCollation() const { return meta::NC_AT_END; } + const std::vector<std::string>& getNumericFunctions() const { return s_numericFunctions; } + meta::OuterJoinSupport getOuterJoinSupport() const { return meta::OJ_LEFT + | meta::OJ_RIGHT + | meta::OJ_FULL; + } + bool isUnrelatedColumnsInOrderBySupported() const { return true; } + meta::QuotedIdentifierCase getQuotedIdentifierCase() const { return meta::QIC_SUPPORTS_MIXED; } + const std::string& getSchemaTerm() const { return s_schemaTerm; } + const std::string& getSearchEscapeString() const { return s_searchEscapeString; } + const std::string& getSpecialCharacters() const { return s_specialCharacters; } + const std::vector<std::string>& getStringFunctions() const { return s_stringFunctions; } + meta::SubQuerySupport getSubQuerySupport() const { return SQ_CORRELATED + | SQ_IN_COMPARISON + | SQ_IN_EXISTS + | SQ_IN_QUANTIFIED; + } + const std::vector<std::string>& getSystemFunctions() const { return s_systemFunctions; } + const std::string& getTableTerm() const { return s_tableTerm; } + const std::vector<std::string>& getDateTimeFunctions() const { return s_dateTimeFunctions; } + bool isTransactionSupported() const { return false; } + meta::UnionSupport getUnionSupport() const { return meta::U_UNION | meta::U_UNION_ALL; } + bool isSelectForUpdateSupported() const { return false; } + + private: + DrillClientImpl& m_client; + }; +} // namespace meta +} // namespace Drill + +#endif // DRILL_METADATA
http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/recordBatch.cpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/recordBatch.cpp b/contrib/native/client/src/clientlib/recordBatch.cpp index c6c033b..6e13293 100644 --- a/contrib/native/client/src/clientlib/recordBatch.cpp +++ b/contrib/native/client/src/clientlib/recordBatch.cpp @@ -17,6 +17,7 @@ */ #include "drill/common.hpp" +#include "drill/fieldmeta.hpp" #include "drill/recordBatch.hpp" #include "utils.hpp" #include "../protobuf/User.pb.h" @@ -403,17 +404,6 @@ bool RecordBatch::isLastChunk(){ -void FieldMetadata::set(const exec::shared::SerializedField& f){ - m_name=f.name_part().name(); - m_minorType=f.major_type().minor_type(); - m_dataMode=f.major_type().mode(); - m_valueCount=f.value_count(); - m_scale=f.major_type().scale(); - m_precision=f.major_type().precision(); - m_bufferLength=f.buffer_length(); -} - - void DateHolder::load(){ m_year=1970; m_month=1; http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcDecoder.cpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/rpcDecoder.cpp b/contrib/native/client/src/clientlib/rpcDecoder.cpp deleted file mode 100644 index d3cf50c..0000000 --- a/contrib/native/client/src/clientlib/rpcDecoder.cpp +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#include <iostream> -#include <google/protobuf/io/coded_stream.h> -#include "drill/common.hpp" -#include "rpcEncoder.hpp" -#include "rpcDecoder.hpp" -#include "rpcMessage.hpp" - -namespace Drill{ - -// return the number of bytes we have read -int RpcDecoder::LengthDecode(const uint8_t* buf, uint32_t* p_length) { - - using google::protobuf::io::CodedInputStream; - - // read the frame to get the length of the message and then - - CodedInputStream* cis = new CodedInputStream(buf, 5); // read 5 bytes at most - - int pos0 = cis->CurrentPosition(); // for debugging - cis->ReadVarint32(p_length); - - #ifdef CODER_DEBUG - cerr << "p_length = " << *p_length << endl; - #endif - - int pos1 = cis->CurrentPosition(); - - #ifdef CODER_DEBUG - cerr << "Reading full length " << *p_length << endl; - #endif - assert( (pos1-pos0) == getRawVarintSize(*p_length)); - delete cis; - return (pos1-pos0); -} - -// TODO: error handling -// -// - assume that the entire message is in the buffer and the buffer is constrained to this message -// - easy to handle with raw arry in C++ -int RpcDecoder::Decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) { - using google::protobuf::io::CodedInputStream; - - // if(!ctx.channel().isOpen()){ return; } - - #ifdef EXTRA_DEBUGGING - std::cerr << "\nInbound rpc message received." << std::endl; - #endif - - CodedInputStream* cis = new CodedInputStream(buf, length); - - - int pos0 = cis->CurrentPosition(); // for debugging - - int len_limit = cis->PushLimit(length); - - uint32_t header_length = 0; - cis->ExpectTag(RpcEncoder::HEADER_TAG); - cis->ReadVarint32(&header_length); - - #ifdef CODER_DEBUG - cerr << "Reading header length " << header_length << ", post read index " << cis->CurrentPosition() << endl; - #endif - - exec::rpc::RpcHeader header; - int header_limit = cis->PushLimit(header_length); - header.ParseFromCodedStream(cis); - cis->PopLimit(header_limit); - msg.m_has_mode = header.has_mode(); - msg.m_mode = header.mode(); - msg.m_coord_id = header.coordination_id(); - msg.m_has_rpc_type = header.has_rpc_type(); - msg.m_rpc_type = header.rpc_type(); - - //if(RpcConstants.EXTRA_DEBUGGING) logger.debug(" post header read index {}", buffer.readerIndex()); - - // read the protobuf body into a buffer. - cis->ExpectTag(RpcEncoder::PROTOBUF_BODY_TAG); - uint32_t p_body_length = 0; - cis->ReadVarint32(&p_body_length); - - #ifdef CODER_DEBUG - cerr << "Reading protobuf body length " << p_body_length << ", post read index " << cis->CurrentPosition() << endl; - #endif - - msg.m_pbody.resize(p_body_length); - cis->ReadRaw(msg.m_pbody.data(),p_body_length); - - - // read the data body. - if (cis->BytesUntilLimit() > 0 ) { - #ifdef CODER_DEBUG - cerr << "Reading raw body, buffer has "<< cis->BytesUntilLimit() << " bytes available, current possion "<< cis->CurrentPosition() << endl; - #endif - cis->ExpectTag(RpcEncoder::RAW_BODY_TAG); - uint32_t d_body_length = 0; - cis->ReadVarint32(&d_body_length); - - if(cis->BytesUntilLimit() != d_body_length) { - #ifdef CODER_DEBUG - cerr << "Expected to receive a raw body of " << d_body_length << " bytes but received a buffer with " <<cis->BytesUntilLimit() << " bytes." << endl; - #endif - } - //msg.m_dbody.resize(d_body_length); - //cis->ReadRaw(msg.m_dbody.data(), d_body_length); - uint32_t currPos=cis->CurrentPosition(); - cis->GetDirectBufferPointer((const void**)&msg.m_dbody, (int*)&d_body_length); - assert(msg.m_dbody==buf+currPos); - cis->Skip(d_body_length); - #ifdef CODER_DEBUG - cerr << "Read raw body of " << d_body_length << " bytes" << endl; - #endif - } else { - #ifdef CODER_DEBUG - cerr << "No need to read raw body, no readable bytes left." << endl; - #endif - } - cis->PopLimit(len_limit); - - - // return the rpc message. - // move the reader index forward so the next rpc call won't try to work with it. - // buffer.skipBytes(dBodyLength); - // messageCounter.incrementAndGet(); - #ifdef CODER_DEBUG - cerr << "Inbound Rpc Message Decoded " << msg << endl; - #endif - - int pos1 = cis->CurrentPosition(); - assert((pos1-pos0) == length); - delete cis; - return (pos1-pos0); -} - -}//namespace Drill http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcDecoder.hpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/rpcDecoder.hpp b/contrib/native/client/src/clientlib/rpcDecoder.hpp deleted file mode 100644 index dca49f7..0000000 --- a/contrib/native/client/src/clientlib/rpcDecoder.hpp +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#ifndef RPC_DECODER_H -#define RPC_DECODER_H - -#include "rpcMessage.hpp" - -namespace Drill { - -class RpcDecoder { - public: - RpcDecoder() { } - ~RpcDecoder() { } - // bool Decode(const DataBuf& buf); - // bool Decode(const DataBuf& buf, InBoundRpcMessage& msg); - static int LengthDecode(const uint8_t* buf, uint32_t* length); // read the length prefix (at most 4 bytes) - static int Decode(const uint8_t* buf, int length, InBoundRpcMessage& msg); -}; - -} // namespace Drill -#endif http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcEncoder.cpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/rpcEncoder.cpp b/contrib/native/client/src/clientlib/rpcEncoder.cpp deleted file mode 100644 index 2f354d7..0000000 --- a/contrib/native/client/src/clientlib/rpcEncoder.cpp +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#include <google/protobuf/io/coded_stream.h> -#include <google/protobuf/io/zero_copy_stream_impl_lite.h> -#include <google/protobuf/message_lite.h> -#include <google/protobuf/wire_format_lite.h> - -#include "drill/common.hpp" -#include "rpcEncoder.hpp" -#include "rpcMessage.hpp" - -namespace Drill{ - -using google::protobuf::internal::WireFormatLite; -using exec::rpc::CompleteRpcMessage; - -const uint32_t RpcEncoder::HEADER_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kHeaderFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED); -const uint32_t RpcEncoder::PROTOBUF_BODY_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kProtobufBodyFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED); -const uint32_t RpcEncoder::RAW_BODY_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kRawBodyFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED); -const uint32_t RpcEncoder::HEADER_TAG_LENGTH = getRawVarintSize(HEADER_TAG); -const uint32_t RpcEncoder::PROTOBUF_BODY_TAG_LENGTH = getRawVarintSize(PROTOBUF_BODY_TAG); -const uint32_t RpcEncoder::RAW_BODY_TAG_LENGTH = getRawVarintSize(RAW_BODY_TAG); - - -bool RpcEncoder::Encode(DataBuf& buf, OutBoundRpcMessage& msg) { - using exec::rpc::RpcHeader; - using google::protobuf::io::CodedOutputStream; - using google::protobuf::io::ArrayOutputStream; - // Todo: - // - // - let a context manager to allocate a buffer `ByteBuf buf = ctx.alloc().buffer();` - // - builder pattern - // - #ifdef CODER_DEBUG - cerr << "\nEncoding outbound message " << msg << endl; - #endif - - RpcHeader header; - header.set_mode(msg.m_mode); - header.set_coordination_id(msg.m_coord_id); - header.set_rpc_type(msg.m_rpc_type); - - // calcute the length of the message - int header_length = header.ByteSize(); - int proto_body_length = msg.m_pbody->ByteSize(); - int full_length = HEADER_TAG_LENGTH + getRawVarintSize(header_length) + header_length + \ - PROTOBUF_BODY_TAG_LENGTH + getRawVarintSize(proto_body_length) + proto_body_length; - - /* - if(raw_body_length > 0) { - full_length += (RAW_BODY_TAG_LENGTH + getRawVarintSize(raw_body_length) + raw_body_length); - } - */ - - buf.resize(full_length + getRawVarintSize(full_length)); - ArrayOutputStream* os = new ArrayOutputStream(buf.data(), buf.size()); - CodedOutputStream* cos = new CodedOutputStream(os); - - - #ifdef CODER_DEBUG - cerr << "Writing full length " << full_length << endl; - #endif - - // write full length first (this is length delimited stream). - cos->WriteVarint32(full_length); - - #ifdef CODER_DEBUG - cerr << "Writing header length " << header_length << endl; - #endif - - cos->WriteVarint32(HEADER_TAG); - cos->WriteVarint32(header_length); - - header.SerializeToCodedStream(cos); - - // write protobuf body length and body - #ifdef CODER_DEBUG - cerr << "Writing protobuf body length " << proto_body_length << endl; - #endif - - cos->WriteVarint32(PROTOBUF_BODY_TAG); - cos->WriteVarint32(proto_body_length); - msg.m_pbody->SerializeToCodedStream(cos); - - delete os; - delete cos; - - // Done! no read to write data body for client - return true; -} - -} // namespace Drill http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcEncoder.hpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/rpcEncoder.hpp b/contrib/native/client/src/clientlib/rpcEncoder.hpp deleted file mode 100644 index a4a7216..0000000 --- a/contrib/native/client/src/clientlib/rpcEncoder.hpp +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -#ifndef RPC_ENCODER_H -#define RPC_ENCODER_H - -#include "rpcMessage.hpp" - -namespace Drill { - -class RpcEncoder { - public: - RpcEncoder() {} - ~RpcEncoder() { } - bool Encode(DataBuf& buf,OutBoundRpcMessage& msg); - static const uint32_t HEADER_TAG; - static const uint32_t PROTOBUF_BODY_TAG; - static const uint32_t RAW_BODY_TAG; - static const uint32_t HEADER_TAG_LENGTH; - static const uint32_t PROTOBUF_BODY_TAG_LENGTH; - static const uint32_t RAW_BODY_TAG_LENGTH; -}; - -// copy from java code -inline int getRawVarintSize(uint32_t value) { - int count = 0; - while (true) { - if ((value & ~0x7F) == 0) { - count++; - return count; - } else { - count++; - value >>= 7; - } - } -} - -} // namespace Drill -#endif http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcMessage.cpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/rpcMessage.cpp b/contrib/native/client/src/clientlib/rpcMessage.cpp new file mode 100644 index 0000000..13cd7a8 --- /dev/null +++ b/contrib/native/client/src/clientlib/rpcMessage.cpp @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include <google/protobuf/io/coded_stream.h> +#include <google/protobuf/io/zero_copy_stream_impl_lite.h> +#include <google/protobuf/message_lite.h> +#include <google/protobuf/wire_format_lite.h> + +#include "drill/common.hpp" +#include "rpcMessage.hpp" + +namespace Drill{ +namespace rpc { + + +namespace { +using google::protobuf::internal::WireFormatLite; +using google::protobuf::io::CodedOutputStream; +using exec::rpc::CompleteRpcMessage; + +static const uint32_t HEADER_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kHeaderFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED); +static const uint32_t PROTOBUF_BODY_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kProtobufBodyFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED); +static const uint32_t RAW_BODY_TAG = WireFormatLite::MakeTag(CompleteRpcMessage::kRawBodyFieldNumber, WireFormatLite::WIRETYPE_LENGTH_DELIMITED); +static const uint32_t HEADER_TAG_LENGTH = CodedOutputStream::VarintSize32(HEADER_TAG); +static const uint32_t PROTOBUF_BODY_TAG_LENGTH = CodedOutputStream::VarintSize32(PROTOBUF_BODY_TAG); +} + +std::size_t lengthDecode(const uint8_t* buf, uint32_t& length) { + using google::protobuf::io::CodedInputStream; + using google::protobuf::io::CodedOutputStream; + + // read the frame to get the length of the message and then + + CodedInputStream cis(buf, 5); // read 5 bytes at most + + int startPos(cis.CurrentPosition()); // for debugging + if (!cis.ReadVarint32(&length)) { + return -1; + } + + #ifdef CODER_DEBUG + std::cerr << "length = " << length << std::endl; + #endif + + int endPos(cis.CurrentPosition()); + + assert((endPos-startPos) == CodedOutputStream::VarintSize32(length)); + return (endPos-startPos); +} + +// TODO: error handling +// +// - assume that the entire message is in the buffer and the buffer is constrained to this message +// - easy to handle with raw array in C++ +bool decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) { + using google::protobuf::io::CodedInputStream; + + CodedInputStream cis(buf, length); + + int startPos(cis.CurrentPosition()); // for debugging + + CodedInputStream::Limit len_limit(cis.PushLimit(length)); + + uint32_t header_length(0); + + if (!cis.ExpectTag(HEADER_TAG)) { + return false; + } + + if (!cis.ReadVarint32(&header_length)) { + return false; + } + + #ifdef CODER_DEBUG + std::cerr << "Reading header length " << header_length << ", post read index " << cis.CurrentPosition() << std::endl; + #endif + + exec::rpc::RpcHeader header; + CodedInputStream::Limit header_limit(cis.PushLimit(header_length)); + + if (!header.ParseFromCodedStream(&cis)) { + return false; + } + cis.PopLimit(header_limit); + + msg.m_has_mode = header.has_mode(); + msg.m_mode = header.mode(); + msg.m_coord_id = header.coordination_id(); + msg.m_has_rpc_type = header.has_rpc_type(); + msg.m_rpc_type = header.rpc_type(); + + // read the protobuf body into a buffer. + if (!cis.ExpectTag(PROTOBUF_BODY_TAG)) { + return false; + } + + uint32_t pbody_length(0); + if (!cis.ReadVarint32(&pbody_length)) { + return false; + } + + #ifdef CODER_DEBUG + std::cerr << "Reading protobuf body length " << pbody_length << ", post read index " << cis.CurrentPosition() << std::endl; + #endif + + msg.m_pbody.resize(pbody_length); + if (!cis.ReadRaw(msg.m_pbody.data(), pbody_length)) { + return false; + } + + // read the data body. + if (cis.BytesUntilLimit() > 0 ) { + #ifdef CODER_DEBUG + std::cerr << "Reading raw body, buffer has "<< std::cis->BytesUntilLimit() << " bytes available, current possion "<< cis.CurrentPosition() << endl; + #endif + if (!cis.ExpectTag(RAW_BODY_TAG)) { + return false; + } + + uint32_t dbody_length = 0; + if (!cis.ReadVarint32(&dbody_length)) { + return false; + } + + if(cis.BytesUntilLimit() != dbody_length) { + #ifdef CODER_DEBUG + cerr << "Expected to receive a raw body of " << dbody_length << " bytes but received a buffer with " <<cis->BytesUntilLimit() << " bytes." << endl; + #endif + return false; + } + + int currPos(cis.CurrentPosition()); + int size; + cis.GetDirectBufferPointer(const_cast<const void**>(reinterpret_cast<void**>(&msg.m_dbody)), &size); + cis.Skip(size); + + assert(dbody_length == size); + assert(msg.m_dbody==buf+currPos); + #ifdef CODER_DEBUG + cerr << "Read raw body of " << dbody_length << " bytes" << endl; + #endif + } else { + #ifdef CODER_DEBUG + cerr << "No need to read raw body, no readable bytes left." << endl; + #endif + } + cis.PopLimit(len_limit); + + + // return the rpc message. + // move the reader index forward so the next rpc call won't try to work with it. + // buffer.skipBytes(dBodyLength); + // messageCounter.incrementAndGet(); + #ifdef CODER_DEBUG + std::cerr << "Inbound Rpc Message Decoded " << msg << std::endl; + #endif + + int endPos = cis.CurrentPosition(); + assert((endPos-startPos) == length); + return true; +} + + +bool encode(DataBuf& buf, const OutBoundRpcMessage& msg) { + using exec::rpc::RpcHeader; + using google::protobuf::io::CodedOutputStream; + // Todo: + // + // - let a context manager to allocate a buffer `ByteBuf buf = ctx.alloc().buffer();` + // - builder pattern + // + #ifdef CODER_DEBUG + std::cerr << "Encoding outbound message " << msg << std::endl; + #endif + + RpcHeader header; + header.set_mode(msg.m_mode); + header.set_coordination_id(msg.m_coord_id); + header.set_rpc_type(msg.m_rpc_type); + + // calcute the length of the message + int header_length = header.ByteSize(); + int proto_body_length = msg.m_pbody->ByteSize(); + int full_length = HEADER_TAG_LENGTH + CodedOutputStream::VarintSize32(header_length) + header_length + \ + PROTOBUF_BODY_TAG_LENGTH + CodedOutputStream::VarintSize32(proto_body_length) + proto_body_length; + + /* + if(raw_body_length > 0) { + full_length += (RAW_BODY_TAG_LENGTH + getRawVarintSize(raw_body_length) + raw_body_length); + } + */ + + buf.resize(full_length + CodedOutputStream::VarintSize32(full_length)); + + uint8_t* data = buf.data(); + + #ifdef CODER_DEBUG + std::cerr << "Writing full length " << full_length << std::endl; + #endif + + data = CodedOutputStream::WriteVarint32ToArray(full_length, data); + + #ifdef CODER_DEBUG + std::cerr << "Writing header length " << header_length << std::endl; + #endif + + data = CodedOutputStream::WriteVarint32ToArray(HEADER_TAG, data); + data = CodedOutputStream::WriteVarint32ToArray(header_length, data); + + data = header.SerializeWithCachedSizesToArray(data); + + // write protobuf body length and body + #ifdef CODER_DEBUG + std::cerr << "Writing protobuf body length " << proto_body_length << std::endl; + #endif + + data = CodedOutputStream::WriteVarint32ToArray(PROTOBUF_BODY_TAG, data); + data = CodedOutputStream::WriteVarint32ToArray(proto_body_length, data); + msg.m_pbody->SerializeWithCachedSizesToArray(data); + + // Done! no read to write data body for client + return true; +} +} // namespace rpc +} // namespace Drill http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcMessage.hpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/rpcMessage.hpp b/contrib/native/client/src/clientlib/rpcMessage.hpp index 6696971..15487e9 100644 --- a/contrib/native/client/src/clientlib/rpcMessage.hpp +++ b/contrib/native/client/src/clientlib/rpcMessage.hpp @@ -25,8 +25,8 @@ #include "GeneralRPC.pb.h" namespace Drill { - -class InBoundRpcMessage { +namespace rpc { +struct InBoundRpcMessage { public: exec::rpc::RpcMode m_mode; int m_rpc_type; @@ -39,7 +39,7 @@ class InBoundRpcMessage { bool has_rpc_type() { return m_has_rpc_type; }; }; -class OutBoundRpcMessage { +struct OutBoundRpcMessage { public: exec::rpc::RpcMode m_mode; int m_rpc_type; @@ -49,6 +49,13 @@ class OutBoundRpcMessage { m_mode(mode), m_rpc_type(rpc_type), m_coord_id(coord_id), m_pbody(pbody) { } }; -} +std::size_t lengthDecode(const uint8_t* buf, uint32_t& length); + +bool decode(const uint8_t* buf, int length, InBoundRpcMessage& msg); + +bool encode(DataBuf& buf, const OutBoundRpcMessage& msg); + +} // namespace rpc +} // namespace Drill #endif http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/utils.cpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/utils.cpp b/contrib/native/client/src/clientlib/utils.cpp index 1e6a877..d3c8f08 100644 --- a/contrib/native/client/src/clientlib/utils.cpp +++ b/contrib/native/client/src/clientlib/utils.cpp @@ -22,6 +22,13 @@ #include "logger.hpp" #include "drill/common.hpp" +#if defined _WIN32 || defined _WIN64 +//Windows header files redefine 'max' +#ifdef max +#undef max +#endif +#endif + namespace Drill{ http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/utils.hpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/utils.hpp b/contrib/native/client/src/clientlib/utils.hpp index 3237aa3..4cd8fa5 100644 --- a/contrib/native/client/src/clientlib/utils.hpp +++ b/contrib/native/client/src/clientlib/utils.hpp @@ -31,7 +31,6 @@ #undef random #endif #endif -#include <boost/asio/deadline_timer.hpp> #include <boost/random/mersenne_twister.hpp> // for mt19937 #include <boost/random/random_device.hpp> #include <boost/random/uniform_int.hpp> http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/y2038/time64.c ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/y2038/time64.c b/contrib/native/client/src/clientlib/y2038/time64.c index e0d61c8..bbbabe2 100644 --- a/contrib/native/client/src/clientlib/y2038/time64.c +++ b/contrib/native/client/src/clientlib/y2038/time64.c @@ -110,15 +110,15 @@ static const int safe_years_low[SOLAR_CYCLE_LENGTH] = { }; /* This isn't used, but it's handy to look at */ -static const char dow_year_start[SOLAR_CYCLE_LENGTH] = { - 5, 0, 1, 2, /* 0 2016 - 2019 */ - 3, 5, 6, 0, /* 4 */ - 1, 3, 4, 5, /* 8 1996 - 1998, 1971*/ - 6, 1, 2, 3, /* 12 1972 - 1975 */ - 4, 6, 0, 1, /* 16 */ - 2, 4, 5, 6, /* 20 2036, 2037, 2010, 2011 */ - 0, 2, 3, 4 /* 24 2012, 2013, 2014, 2015 */ -}; +//static const char dow_year_start[SOLAR_CYCLE_LENGTH] = { +// 5, 0, 1, 2, /* 0 2016 - 2019 */ +// 3, 5, 6, 0, /* 4 */ +// 1, 3, 4, 5, /* 8 1996 - 1998, 1971*/ +// 6, 1, 2, 3, /* 12 1972 - 1975 */ +// 4, 6, 0, 1, /* 16 */ +// 2, 4, 5, 6, /* 20 2036, 2037, 2010, 2011 */ +// 0, 2, 3, 4 /* 24 2012, 2013, 2014, 2015 */ +//}; /* Let's assume people are going to be looking for dates in the future. Let's provide some cheats so you can skip ahead. http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/zookeeperClient.cpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/zookeeperClient.cpp b/contrib/native/client/src/clientlib/zookeeperClient.cpp new file mode 100644 index 0000000..535bebc --- /dev/null +++ b/contrib/native/client/src/clientlib/zookeeperClient.cpp @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <boost/bind.hpp> +#include <drill/drillClient.hpp> +#include "zookeeperClient.hpp" + +#include "errmsgs.hpp" +#include "logger.hpp" + +namespace Drill { +std::string ZookeeperClient::s_defaultDrillPath("/drill/drillbits1"); +static void watcherCallback(zhandle_t *zzh, int type, int state, const char *path, void* context) { + static_cast<ZookeeperClient*>(context)->watcher(zzh, type, state, path, context); +} + +ZookeeperClient::ZookeeperClient(const std::string& drillPath) +: p_zh(), m_state(), m_path(drillPath) { + m_bConnecting=true; + memset(&m_id, 0, sizeof(m_id)); +} + +ZookeeperClient::~ZookeeperClient(){ +} + +ZooLogLevel ZookeeperClient::getZkLogLevel(){ + //typedef enum {ZOO_LOG_LEVEL_ERROR=1, + // ZOO_LOG_LEVEL_WARN=2, + // ZOO_LOG_LEVEL_INFO=3, + // ZOO_LOG_LEVEL_DEBUG=4 + //} ZooLogLevel; + switch(DrillClientConfig::getLogLevel()){ + case LOG_TRACE: + case LOG_DEBUG: + return ZOO_LOG_LEVEL_DEBUG; + case LOG_INFO: + return ZOO_LOG_LEVEL_INFO; + case LOG_WARNING: + return ZOO_LOG_LEVEL_WARN; + case LOG_ERROR: + case LOG_FATAL: + default: + return ZOO_LOG_LEVEL_ERROR; + } + return ZOO_LOG_LEVEL_ERROR; +} + +void ZookeeperClient::watcher(zhandle_t *zzh, int type, int state, const char *path, void*) { + //From cli.c + + /* Be careful using zh here rather than zzh - as this may be mt code + * the client lib may call the watcher before zookeeper_init returns */ + + this->m_state=state; + if (type == ZOO_SESSION_EVENT) { + if (state == ZOO_CONNECTED_STATE) { + } else if (state == ZOO_AUTH_FAILED_STATE) { + this->m_err= getMessage(ERR_CONN_ZKNOAUTH); + this->close(); + } else if (state == ZOO_EXPIRED_SESSION_STATE) { + this->m_err= getMessage(ERR_CONN_ZKEXP); + this->close(); + } + } + // signal the cond var + { + if (state == ZOO_CONNECTED_STATE){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;) + } + boost::lock_guard<boost::mutex> bufferLock(this->m_cvMutex); + this->m_bConnecting=false; + } + this->m_cv.notify_one(); +} + +int ZookeeperClient::getAllDrillbits(const std::string& connectStr, std::vector<std::string>& drillbits){ + uint32_t waitTime=30000; // 10 seconds + zoo_set_debug_level(getZkLogLevel()); + zoo_deterministic_conn_order(1); // enable deterministic order + + p_zh = boost::shared_ptr<zhandle_t>(zookeeper_init(connectStr.c_str(), &watcherCallback, waitTime, &m_id, this, 0), zookeeper_close); + if(!p_zh) { + m_err = getMessage(ERR_CONN_ZKFAIL); + return -1; + } + + m_err=""; + //Wait for the completion handler to signal successful connection + boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex); + boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime); + while(this->m_bConnecting) { + if(!this->m_cv.timed_wait(bufferLock, timeout)){ + m_err = getMessage(ERR_CONN_ZKTIMOUT); + return -1; + } + } + + if(m_state!=ZOO_CONNECTED_STATE){ + return -1; + } + + int rc = ZOK; + + struct String_vector drillbitsVector; + rc=zoo_get_children(p_zh.get(), m_path.c_str(), 0, &drillbitsVector); + if(rc!=ZOK){ + m_err=getMessage(ERR_CONN_ZKERR, rc); + p_zh.reset(); + return -1; + } + + // Make sure we deallocate drillbitsVector properly when we exit + boost::shared_ptr<String_vector> guard(&drillbitsVector, deallocate_String_vector); + + if(drillbitsVector.count > 0){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Found " << drillbitsVector.count << " drillbits in cluster (" + << connectStr << "/" << m_path + << ")." <<std::endl;) + for(int i=0; i<drillbitsVector.count; i++){ + drillbits.push_back(drillbitsVector.data[i]); + } + for(int i=0; i<drillbits.size(); i++){ + DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "\t Unshuffled Drillbit id: " << drillbits[i] << std::endl;) + } + } + return 0; +} + +int ZookeeperClient::getEndPoint(const std::string& drillbit, exec::DrillbitEndpoint& endpoint){ + int rc = ZOK; + // pick the drillbit at 'index' + std::string s(m_path + "/" + drillbit); + int buffer_len=MAX_CONNECT_STR; + char buffer[MAX_CONNECT_STR+1]; + struct Stat stat; + buffer[MAX_CONNECT_STR]=0; + rc= zoo_get(p_zh.get(), s.c_str(), 0, buffer, &buffer_len, &stat); + if(rc!=ZOK){ + m_err=getMessage(ERR_CONN_ZKDBITERR, rc); + return -1; + } + exec::DrillServiceInstance drillServiceInstance; + drillServiceInstance.ParseFromArray(buffer, buffer_len); + endpoint=drillServiceInstance.endpoint(); + + return 0; +} + +void ZookeeperClient::close(){ + p_zh.reset(); +} + +} /* namespace Drill */ http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/zookeeperClient.hpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/clientlib/zookeeperClient.hpp b/contrib/native/client/src/clientlib/zookeeperClient.hpp new file mode 100644 index 0000000..25d6af5 --- /dev/null +++ b/contrib/native/client/src/clientlib/zookeeperClient.hpp @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef _WIN32 +#include <zookeeper.h> +#else +#include <zookeeper/zookeeper.h> +#endif + +#include <boost/shared_ptr.hpp> +#include <boost/thread/condition_variable.hpp> +#include <boost/thread/mutex.hpp> + +#include "UserBitShared.pb.h" + + +#ifndef ZOOKEEPER_CLIENT_H +#define ZOOKEEPER_CLIENT_H + +namespace Drill { +class ZookeeperClient{ + public: + static std::string s_defaultDrillPath; + + ZookeeperClient(const std::string& drillPath = s_defaultDrillPath); + ~ZookeeperClient(); + static ZooLogLevel getZkLogLevel(); + // comma separated host:port pairs, each corresponding to a zk + // server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002 + void close(); + const std::string& getError() const{return m_err;} + // return unshuffled list of drillbits + int getAllDrillbits(const std::string& connectStr, std::vector<std::string>& drillbits); + // picks the index drillbit and returns the corresponding endpoint object + int getEndPoint(const std::string& drillbit, exec::DrillbitEndpoint& endpoint); + + void watcher(zhandle_t *zzh, int type, int state, const char *path, void* context); + + private: + boost::shared_ptr<zhandle_t> p_zh; + clientid_t m_id; + int m_state; + std::string m_err; + + boost::mutex m_cvMutex; + // Condition variable to signal connection callback has been processed + boost::condition_variable m_cv; + bool m_bConnecting; + std::string m_path; + +}; +} /* namespace Drill */ + + + +#endif /* ZOOKEEPER_H */ http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/include/drill/collections.hpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/include/drill/collections.hpp b/contrib/native/client/src/include/drill/collections.hpp new file mode 100644 index 0000000..9fbfcc5 --- /dev/null +++ b/contrib/native/client/src/include/drill/collections.hpp @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _DRILL_COLLECTIONS_H +#define _DRILL_COLLECTIONS_H + +#include <iterator> + +#include <boost/noncopyable.hpp> +#include <boost/shared_ptr.hpp> + +namespace Drill { +namespace impl { + +/** + * Interface for internal iterators + */ +template<typename T> +class DrillIteratorImpl: private boost::noncopyable { +public: + typedef DrillIteratorImpl<T> iterator; + typedef boost::shared_ptr<iterator> iterator_ptr; + + typedef T value_type; + typedef value_type& reference; + typedef value_type* pointer; + + virtual ~DrillIteratorImpl() {}; + + // To allow conversion from non-const to const types + virtual operator typename DrillIteratorImpl<const T>::iterator_ptr() const = 0; + + virtual reference operator*() const = 0; + virtual pointer operator->() const = 0; + + virtual iterator& operator++() = 0; + + virtual bool operator==(const iterator& x) const = 0; + virtual bool operator!=(const iterator& x) const = 0; +}; + +/** + * Interface for internal collections + */ +template<typename T> +class DrillCollectionImpl: private boost::noncopyable { +public: + // STL-like iterator typedef + typedef DrillIteratorImpl<T> iterator; + typedef boost::shared_ptr<iterator> iterator_ptr; + typedef DrillIteratorImpl<const T> const_iterator; + typedef boost::shared_ptr<const_iterator> const_iterator_ptr; + + typedef T value_type; + typedef value_type& reference; + typedef const value_type& const_reference; + typedef value_type* pointer; + typedef const value_type* const_pointer; + typedef int size_type; + + virtual ~DrillCollectionImpl() {} + + virtual iterator_ptr begin() = 0; + virtual const_iterator_ptr begin() const = 0; + virtual iterator_ptr end() = 0; + virtual const_iterator_ptr end() const = 0; +}; +} // namespace internal + +template<typename T> +class DrillCollection; + +template<typename T> +class DrillIterator: public std::iterator<std::input_iterator_tag, T> { +public: + typedef impl::DrillIteratorImpl<T> Impl; + typedef boost::shared_ptr<Impl> ImplPtr; + + typedef DrillIterator<T> iterator; + typedef std::iterator<std::input_iterator_tag, T> superclass; + typedef typename superclass::reference reference; + typedef typename superclass::pointer pointer; + + // Default constructor + DrillIterator(): m_pImpl() {}; + ~DrillIterator() {} + + // Iterators are CopyConstructible and CopyAssignable + DrillIterator(const iterator& it): m_pImpl(it.m_pImpl) {} + iterator& operator=(const iterator& it) { + m_pImpl = it.m_pImpl; + return *this; + } + + template<typename U> + DrillIterator(const DrillIterator<U>& it): m_pImpl(*it.m_pImpl) {} + + reference operator*() const { return m_pImpl->operator*(); } + pointer operator->() const { return m_pImpl->operator->(); } + + iterator& operator++() { m_pImpl->operator++(); return *this; } + + bool operator==(const iterator& x) const { + if (m_pImpl == x.m_pImpl) { + return true; + } + return m_pImpl && m_pImpl->operator==(*x.m_pImpl); + } + + bool operator!=(const iterator& x) const { + if (m_pImpl == x.m_pImpl) { + return false; + } + return !m_pImpl || m_pImpl->operator!=(*x.m_pImpl); + } + +private: + template<typename U> + friend class DrillCollection; + template<typename U> + friend class DrillIterator; + + ImplPtr m_pImpl; + + template<typename U> + DrillIterator(const boost::shared_ptr<impl::DrillIteratorImpl<U> >& pImpl): m_pImpl(pImpl) {} +}; + +template<typename T> +class DrillCollection { +public: + typedef impl::DrillCollectionImpl<T> Impl; + typedef boost::shared_ptr<Impl> ImplPtr; + + // STL-like iterator typedef + typedef DrillIterator<T> iterator; + typedef DrillIterator<const T> const_iterator; + typedef T value_type; + typedef value_type& reference; + typedef const value_type& const_reference; + typedef value_type* pointer; + typedef const value_type* const_pointer; + typedef int size_type; + + iterator begin() { return iterator(m_pImpl->begin()); } + const_iterator begin() const { return const_iterator(boost::const_pointer_cast<const Impl>(m_pImpl)->begin()); } + iterator end() { return iterator(m_pImpl->end()); } + const_iterator end() const { return const_iterator(boost::const_pointer_cast<const Impl>(m_pImpl)->end()); } + +protected: + DrillCollection(const ImplPtr& impl): m_pImpl(impl) {} + + Impl& operator*() { return *m_pImpl; } + const Impl& operator*() const { return *m_pImpl; } + Impl* operator->() { return m_pImpl.get(); } + const Impl* operator->() const { return m_pImpl.get(); } + +private: + ImplPtr m_pImpl; +}; + + +} /* namespace Drill */ +#endif /* _DRILL_COLLECTIONS_H */ http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/include/drill/common.hpp ---------------------------------------------------------------------- diff --git a/contrib/native/client/src/include/drill/common.hpp b/contrib/native/client/src/include/drill/common.hpp index a617dc7..6d3816e 100644 --- a/contrib/native/client/src/include/drill/common.hpp +++ b/contrib/native/client/src/include/drill/common.hpp @@ -20,6 +20,24 @@ #ifndef _COMMON_H_ #define _COMMON_H_ +#if defined _WIN32 || defined __CYGWIN__ + #ifdef DRILL_CLIENT_EXPORTS + #define DECLSPEC_DRILL_CLIENT __declspec(dllexport) + #else + #ifdef USE_STATIC_LIBDRILL + #define DECLSPEC_DRILL_CLIENT + #else + #define DECLSPEC_DRILL_CLIENT __declspec(dllimport) + #endif + #endif +#else + #if __GNUC__ >= 4 + #define DECLSPEC_DRILL_CLIENT __attribute__ ((visibility ("default"))) + #else + #define DECLSPEC_DRILL_CLIENT + #endif +#endif + #ifdef _WIN32 // The order of inclusion is important. Including winsock2 before everything else // ensures that the correct typedefs are defined and that the older typedefs defined
