http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/metadata.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/metadata.hpp 
b/contrib/native/client/src/clientlib/metadata.hpp
new file mode 100644
index 0000000..0cc8987
--- /dev/null
+++ b/contrib/native/client/src/clientlib/metadata.hpp
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#ifndef DRILL_METADATA_H
+#define DRILL_METADATA_H
+
+#include <boost/ref.hpp>
+
+#include "drill/common.hpp"
+#include "drill/drillClient.hpp"
+#include "env.h"
+#include "User.pb.h"
+
+namespace Drill {
+class DrillClientImpl;
+
+namespace meta {
+       class DrillCatalogMetadata: public meta::CatalogMetadata {
+       public:
+               DrillCatalogMetadata(const ::exec::user::CatalogMetadata& 
metadata):
+                       meta::CatalogMetadata(),
+                       m_pMetadata(metadata){
+               }
+
+         bool hasCatalogName() const { return 
m_pMetadata.get().has_catalog_name(); }
+         const std::string& getCatalogName() const { return 
m_pMetadata.get().catalog_name(); }
+
+         bool hasDescription() const { return 
m_pMetadata.get().has_description(); }
+         const std::string& getDescription() const { return 
m_pMetadata.get().description(); }
+
+         bool hasConnect() const { return m_pMetadata.get().has_connect(); }
+         const std::string& getConnect() const { return 
m_pMetadata.get().connect(); }
+
+       private:
+               boost::reference_wrapper<const ::exec::user::CatalogMetadata> 
m_pMetadata;
+       };
+
+       class DrillSchemaMetadata: public meta::SchemaMetadata {
+       public:
+               DrillSchemaMetadata(const ::exec::user::SchemaMetadata& 
metadata):
+                       meta::SchemaMetadata(),
+                       m_pMetadata(metadata){
+               }
+
+               bool hasCatalogName() const { return 
m_pMetadata.get().has_catalog_name(); }
+               const std::string& getCatalogName() const { return 
m_pMetadata.get().catalog_name(); }
+
+               bool hasSchemaName() const { return 
m_pMetadata.get().has_schema_name(); }
+               const std::string& getSchemaName() const { return 
m_pMetadata.get().schema_name(); }
+
+               bool hasOwnerName() const { return 
m_pMetadata.get().has_owner(); }
+               const std::string& getOwner() const { return 
m_pMetadata.get().owner(); }
+
+               bool hasType() const { return m_pMetadata.get().has_type(); }
+               const std::string& getType() const { return 
m_pMetadata.get().type(); }
+
+               bool hasMutable() const { return 
m_pMetadata.get().has_mutable_(); }
+               const std::string& getMutable() const { return 
m_pMetadata.get().mutable_(); }
+
+       private:
+               boost::reference_wrapper<const ::exec::user::SchemaMetadata> 
m_pMetadata;
+       };
+
+       class DrillTableMetadata: public meta::TableMetadata {
+       public:
+               DrillTableMetadata(const ::exec::user::TableMetadata& metadata):
+                       meta::TableMetadata(),
+                       m_pMetadata(metadata){
+               }
+
+         bool hasCatalogName() const { return 
m_pMetadata.get().has_catalog_name(); }
+         const std::string& getCatalogName() const { return 
m_pMetadata.get().catalog_name(); }
+
+         bool hasSchemaName() const { return 
m_pMetadata.get().has_schema_name(); }
+         const std::string& getSchemaName() const { return 
m_pMetadata.get().schema_name(); }
+
+         bool hasTableName() const { return 
m_pMetadata.get().has_table_name(); }
+         const std::string& getTableName() const { return 
m_pMetadata.get().table_name(); }
+
+         bool hasType() const { return m_pMetadata.get().has_type(); }
+         const std::string& getType() const { return m_pMetadata.get().type(); 
}
+
+       private:
+         boost::reference_wrapper<const ::exec::user::TableMetadata> 
m_pMetadata;
+       };
+
+       class DrillColumnMetadata: public meta::ColumnMetadata {
+       public:
+               DrillColumnMetadata(const ::exec::user::ColumnMetadata& 
metadata):
+                       meta::ColumnMetadata(),
+                       m_pMetadata(metadata){
+               }
+
+               bool hasCatalogName() const { return 
m_pMetadata.get().has_catalog_name(); }
+               const std::string& getCatalogName() const { return 
m_pMetadata.get().catalog_name(); }
+
+               bool hasSchemaName() const { return 
m_pMetadata.get().has_schema_name(); }
+               const std::string& getSchemaName() const { return 
m_pMetadata.get().schema_name(); }
+
+               bool hasTableName() const { return 
m_pMetadata.get().has_table_name(); }
+               const std::string& getTableName() const { return 
m_pMetadata.get().table_name(); }
+
+               bool hasColumnName() const { return 
m_pMetadata.get().has_column_name(); }
+               const std::string& getColumnName() const { return 
m_pMetadata.get().column_name(); }
+
+               bool hasOrdinalPosition() const { return 
m_pMetadata.get().has_ordinal_position(); }
+               std::size_t getOrdinalPosition() const { return 
m_pMetadata.get().ordinal_position(); }
+
+               bool hasDefaultValue() const { return 
m_pMetadata.get().has_default_value(); }
+               const std::string& getDefaultValue() const { return 
m_pMetadata.get().default_value(); }
+
+               bool hasNullable() const { return 
m_pMetadata.get().has_is_nullable(); }
+               bool isNullable() const { return 
m_pMetadata.get().is_nullable(); }
+
+               bool hasDataType() const { return 
m_pMetadata.get().has_data_type(); }
+               const std::string& getDataType() const { return 
m_pMetadata.get().data_type(); }
+
+               bool hasColumnSize() const { return 
m_pMetadata.get().has_column_size(); }
+               std::size_t getColumnSize() const { return 
m_pMetadata.get().column_size(); }
+
+               bool hasCharMaxLength() const { return 
m_pMetadata.get().has_char_max_length(); }
+               std::size_t getCharMaxLength() const { return 
m_pMetadata.get().char_max_length(); }
+
+               bool hasCharOctetLength() const { return 
m_pMetadata.get().has_char_octet_length(); }
+               std::size_t getCharOctetLength() const { return 
m_pMetadata.get().char_octet_length(); }
+
+               bool hasNumericPrecision() const { return 
m_pMetadata.get().has_numeric_precision(); }
+               int32_t getNumericPrecision() const { return 
m_pMetadata.get().numeric_precision(); }
+
+               bool hasNumericRadix() const { return 
m_pMetadata.get().has_numeric_precision_radix(); }
+               int32_t getNumericRadix() const { return 
m_pMetadata.get().numeric_precision_radix(); }
+
+               bool hasNumericScale() const { return 
m_pMetadata.get().has_numeric_scale(); }
+               int32_t getNumericScale() const { return 
m_pMetadata.get().numeric_scale(); }
+
+               bool hasIntervalType() const { return 
m_pMetadata.get().has_interval_type(); }
+               const std::string& getIntervalType() const { return 
m_pMetadata.get().interval_type(); }
+
+               bool hasIntervalPrecision() const { return 
m_pMetadata.get().has_interval_precision(); }
+               int32_t getIntervalPrecision() const { return 
m_pMetadata.get().interval_precision(); }
+
+       private:
+               boost::reference_wrapper<const ::exec::user::ColumnMetadata> 
m_pMetadata;
+       };
+
+    class DrillMetadata: public Metadata {
+    public:
+        static const std::string s_connectorName; 
+        static const std::string s_connectorVersion; 
+
+        static const std::string s_serverName;
+        static const std::string s_serverVersion;
+
+        static const std::string s_catalogSeparator;
+        static const std::string s_catalogTerm;
+
+        static const std::string s_identifierQuoteString;
+        static const std::vector<std::string> s_sqlKeywords;
+        static const std::vector<std::string> s_numericFunctions;
+        static const std::string s_schemaTerm;
+        static const std::string s_searchEscapeString;
+        static const std::string s_specialCharacters;
+        static const std::vector<std::string> s_stringFunctions;
+        static const std::vector<std::string> s_systemFunctions;
+        static const std::string s_tableTerm;
+        static const std::vector<std::string> s_dateTimeFunctions;
+
+        DrillMetadata(DrillClientImpl& client): Metadata(), m_client(client) {}
+        ~DrillMetadata() {}
+
+        DrillClientImpl& client() { return m_client; }
+
+        const std::string& getConnectorName() const { return s_connectorName; 
};
+        const std::string& getConnectorVersion() const { return 
s_connectorVersion; }
+        uint32_t getConnectorMajorVersion() const { return 
DRILL_VERSION_MAJOR; } 
+        uint32_t getConnectorMinorVersion() const { return 
DRILL_VERSION_MINOR; } 
+        uint32_t getConnectorPatchVersion() const { return 
DRILL_VERSION_PATCH; } 
+
+        const std::string& getServerName() const;
+        const std::string& getServerVersion() const;
+        uint32_t getServerMajorVersion() const;
+        uint32_t getServerMinorVersion() const;
+        uint32_t getServerPatchVersion() const;
+
+        status_t getCatalogs(const std::string& catalogPattern, 
Metadata::pfnCatalogMetadataListener listener, void* listenerCtx, 
QueryHandle_t* qHandle);
+        status_t getSchemas(const std::string& catalogPattern, const 
std::string& schemaPattern, Metadata::pfnSchemaMetadataListener listener, void* 
listenerCtx, QueryHandle_t* qHandle);
+        status_t getTables(const std::string& catalogPattern, const 
std::string& schemaPattern, const std::string& tablePattern, const 
std::vector<std::string>* tableTypes, Metadata::pfnTableMetadataListener 
listener, void* listenerCtx, QueryHandle_t* qHandle);
+        status_t getColumns(const std::string& catalogPattern, const 
std::string& schemaPattern, const std:: string& tablePattern, const 
std::string& columnPattern, Metadata::pfnColumnMetadataListener listener, void* 
listenerCtx, QueryHandle_t* qHandle);
+
+        bool areAllTableSelectable() const { return false; }
+        bool isCatalogAtStart() const { return true; }
+        const std::string& getCatalogSeparator() const { return 
s_catalogSeparator; }
+        const std::string& getCatalogTerm() const { return s_catalogTerm; }
+        bool isColumnAliasingSupported() const { return true; }
+        bool isNullPlusNonNullNull() const { return true; }
+        bool isConvertSupported(common::MinorType from, common::MinorType to) 
const;
+        meta::CorrelationNamesSupport getCorrelationNames() const { return 
meta::CN_ANY_NAMES; }
+        bool isReadOnly() const { return false; }
+        meta::DateTimeLiteralSupport getDateTimeLiteralsSupport() const {
+            return DL_DATE
+                | DL_TIME
+                | DL_TIMESTAMP
+                | DL_INTERVAL_YEAR
+                | DL_INTERVAL_MONTH
+                | DL_INTERVAL_DAY
+                | DL_INTERVAL_HOUR
+                | DL_INTERVAL_MINUTE
+                | DL_INTERVAL_SECOND
+                | DL_INTERVAL_YEAR_TO_MONTH
+                | DL_INTERVAL_DAY_TO_HOUR
+                | DL_INTERVAL_DAY_TO_MINUTE
+                | DL_INTERVAL_DAY_TO_SECOND
+                | DL_INTERVAL_HOUR_TO_MINUTE
+                | DL_INTERVAL_HOUR_TO_SECOND
+                | DL_INTERVAL_MINUTE_TO_SECOND;
+        }
+
+        meta::CollateSupport getCollateSupport() const { return meta::C_NONE; 
}// supported?
+        meta::GroupBySupport getGroupBySupport() const { return 
meta::GB_UNRELATED; }
+        meta::IdentifierCase getIdentifierCase() const { return 
meta::IC_STORES_UPPER; } // to check?
+
+        const std::string& getIdentifierQuoteString() const { return 
s_identifierQuoteString; }
+        const std::vector<std::string>& getSQLKeywords() const { return 
s_sqlKeywords; }
+        bool isLikeEscapeClauseSupported() const { return true; }
+        std::size_t getMaxBinaryLiteralLength() const { return 0; }
+        std::size_t getMaxCatalogNameLength() const { return 0; }
+        std::size_t getMaxCharLiteralLength() const { return 0; }
+        std::size_t getMaxColumnNameLength() const { return 0; }
+        std::size_t getMaxColumnsInGroupBy() const { return 0; }
+        std::size_t getMaxColumnsInOrderBy() const { return 0; }
+        std::size_t getMaxColumnsInSelect() const { return 0; }
+        std::size_t getMaxCursorNameLength() const { return 0; }
+        std::size_t getMaxLogicalLobSize() const { return 0; }
+        std::size_t getMaxStatements() const { return 0; }
+        std::size_t getMaxRowSize() const { return 0; }
+        bool isBlobIncludedInMaxRowSize() const { return true; }
+        std::size_t getMaxSchemaNameLength() const { return 0; }
+        std::size_t getMaxStatementLength() const { return 0; }
+        std::size_t getMaxTableNameLength() const { return 0; }
+        std::size_t getMaxTablesInSelect() const { return 0; }
+        std::size_t getMaxUserNameLength() const { return 0; }
+        meta::NullCollation getNullCollation() const { return meta::NC_AT_END; 
}
+        const std::vector<std::string>& getNumericFunctions() const { return 
s_numericFunctions; }
+        meta::OuterJoinSupport getOuterJoinSupport() const { return 
meta::OJ_LEFT 
+            | meta::OJ_RIGHT 
+            | meta::OJ_FULL;
+        }
+        bool isUnrelatedColumnsInOrderBySupported() const { return true; }
+        meta::QuotedIdentifierCase getQuotedIdentifierCase() const { return 
meta::QIC_SUPPORTS_MIXED; }
+        const std::string& getSchemaTerm() const { return s_schemaTerm; }
+        const std::string& getSearchEscapeString() const { return 
s_searchEscapeString; }
+        const std::string& getSpecialCharacters() const { return 
s_specialCharacters; }
+        const std::vector<std::string>& getStringFunctions() const { return 
s_stringFunctions; }
+        meta::SubQuerySupport getSubQuerySupport() const { return SQ_CORRELATED
+                | SQ_IN_COMPARISON
+                | SQ_IN_EXISTS
+                | SQ_IN_QUANTIFIED;
+        }
+        const std::vector<std::string>& getSystemFunctions() const { return 
s_systemFunctions; }
+        const std::string& getTableTerm() const { return s_tableTerm; }
+        const std::vector<std::string>& getDateTimeFunctions() const { return 
s_dateTimeFunctions; }
+        bool isTransactionSupported() const { return false; }
+        meta::UnionSupport getUnionSupport() const { return meta::U_UNION | 
meta::U_UNION_ALL; }
+        bool isSelectForUpdateSupported() const { return false; }
+
+    private:
+        DrillClientImpl& m_client;
+    };
+} // namespace meta
+} // namespace Drill
+
+#endif // DRILL_METADATA

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/recordBatch.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/recordBatch.cpp 
b/contrib/native/client/src/clientlib/recordBatch.cpp
index c6c033b..6e13293 100644
--- a/contrib/native/client/src/clientlib/recordBatch.cpp
+++ b/contrib/native/client/src/clientlib/recordBatch.cpp
@@ -17,6 +17,7 @@
  */
 
 #include "drill/common.hpp"
+#include "drill/fieldmeta.hpp"
 #include "drill/recordBatch.hpp"
 #include "utils.hpp"
 #include "../protobuf/User.pb.h"
@@ -403,17 +404,6 @@ bool RecordBatch::isLastChunk(){
 
 
 
-void FieldMetadata::set(const exec::shared::SerializedField& f){
-    m_name=f.name_part().name();
-    m_minorType=f.major_type().minor_type();
-    m_dataMode=f.major_type().mode();
-    m_valueCount=f.value_count();
-    m_scale=f.major_type().scale();
-    m_precision=f.major_type().precision();
-    m_bufferLength=f.buffer_length();
-}
-
-
 void DateHolder::load(){
     m_year=1970;
     m_month=1;

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcDecoder.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcDecoder.cpp 
b/contrib/native/client/src/clientlib/rpcDecoder.cpp
deleted file mode 100644
index d3cf50c..0000000
--- a/contrib/native/client/src/clientlib/rpcDecoder.cpp
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#include <iostream>
-#include <google/protobuf/io/coded_stream.h>
-#include "drill/common.hpp"
-#include "rpcEncoder.hpp"
-#include "rpcDecoder.hpp"
-#include "rpcMessage.hpp"
-
-namespace Drill{
-
-// return the number of bytes we have read
-int RpcDecoder::LengthDecode(const uint8_t* buf, uint32_t* p_length) {
-
-    using google::protobuf::io::CodedInputStream;
-
-    // read the frame to get the length of the message and then
-
-    CodedInputStream* cis = new CodedInputStream(buf, 5); // read 5 bytes at 
most
-
-    int pos0 = cis->CurrentPosition(); // for debugging
-    cis->ReadVarint32(p_length);
-
-    #ifdef CODER_DEBUG
-    cerr << "p_length = " << *p_length << endl;
-    #endif
-
-    int pos1 = cis->CurrentPosition();
-
-    #ifdef CODER_DEBUG
-    cerr << "Reading full length " << *p_length << endl;
-    #endif
-    assert( (pos1-pos0) == getRawVarintSize(*p_length));
-    delete cis;
-    return (pos1-pos0);
-}
-
-// TODO: error handling
-//
-// - assume that the entire message is in the buffer and the buffer is 
constrained to this message
-// - easy to handle with raw arry in C++
-int RpcDecoder::Decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) 
{
-    using google::protobuf::io::CodedInputStream;
-
-    // if(!ctx.channel().isOpen()){ return; }
-
-    #ifdef  EXTRA_DEBUGGING
-    std::cerr <<  "\nInbound rpc message received." << std::endl;
-    #endif
-
-    CodedInputStream* cis = new CodedInputStream(buf, length);
-
-
-    int pos0 = cis->CurrentPosition(); // for debugging
-
-    int len_limit = cis->PushLimit(length);
-
-    uint32_t header_length = 0;
-    cis->ExpectTag(RpcEncoder::HEADER_TAG);
-    cis->ReadVarint32(&header_length);
-
-    #ifdef CODER_DEBUG
-    cerr << "Reading header length " << header_length << ", post read index " 
<< cis->CurrentPosition() << endl;
-    #endif
-
-    exec::rpc::RpcHeader header;
-    int header_limit = cis->PushLimit(header_length);
-    header.ParseFromCodedStream(cis);
-    cis->PopLimit(header_limit);
-    msg.m_has_mode = header.has_mode();
-    msg.m_mode = header.mode();
-    msg.m_coord_id = header.coordination_id();
-    msg.m_has_rpc_type = header.has_rpc_type();
-    msg.m_rpc_type = header.rpc_type();
-
-    //if(RpcConstants.EXTRA_DEBUGGING) logger.debug(" post header read index 
{}", buffer.readerIndex());
-
-    // read the protobuf body into a buffer.
-    cis->ExpectTag(RpcEncoder::PROTOBUF_BODY_TAG);
-    uint32_t p_body_length = 0;
-    cis->ReadVarint32(&p_body_length);
-
-    #ifdef CODER_DEBUG
-    cerr << "Reading protobuf body length " << p_body_length << ", post read 
index " << cis->CurrentPosition() << endl;
-    #endif
-
-    msg.m_pbody.resize(p_body_length);
-    cis->ReadRaw(msg.m_pbody.data(),p_body_length);
-
-
-    // read the data body.
-    if (cis->BytesUntilLimit() > 0 ) {
-    #ifdef CODER_DEBUG
-        cerr << "Reading raw body, buffer has "<< cis->BytesUntilLimit() << " 
bytes available, current possion "<< cis->CurrentPosition()  << endl;
-    #endif
-        cis->ExpectTag(RpcEncoder::RAW_BODY_TAG);
-        uint32_t d_body_length = 0;
-        cis->ReadVarint32(&d_body_length);
-
-        if(cis->BytesUntilLimit() != d_body_length) {
-    #ifdef CODER_DEBUG
-            cerr << "Expected to receive a raw body of " << d_body_length << " 
bytes but received a buffer with " <<cis->BytesUntilLimit() << " bytes." << 
endl;
-    #endif
-        }
-        //msg.m_dbody.resize(d_body_length);
-        //cis->ReadRaw(msg.m_dbody.data(), d_body_length);
-        uint32_t currPos=cis->CurrentPosition();
-        cis->GetDirectBufferPointer((const void**)&msg.m_dbody, 
(int*)&d_body_length);
-        assert(msg.m_dbody==buf+currPos);
-        cis->Skip(d_body_length);
-    #ifdef CODER_DEBUG
-        cerr << "Read raw body of " << d_body_length << " bytes" << endl;
-    #endif
-    } else {
-    #ifdef CODER_DEBUG
-        cerr << "No need to read raw body, no readable bytes left." << endl;
-    #endif
-    }
-    cis->PopLimit(len_limit);
-
-
-    // return the rpc message.
-    // move the reader index forward so the next rpc call won't try to work 
with it.
-    // buffer.skipBytes(dBodyLength);
-    // messageCounter.incrementAndGet();
-    #ifdef CODER_DEBUG
-    cerr << "Inbound Rpc Message Decoded " << msg << endl;
-    #endif
-
-    int pos1 = cis->CurrentPosition();
-    assert((pos1-pos0) == length);
-    delete cis;
-    return (pos1-pos0);
-}
-
-}//namespace Drill

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcDecoder.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcDecoder.hpp 
b/contrib/native/client/src/clientlib/rpcDecoder.hpp
deleted file mode 100644
index dca49f7..0000000
--- a/contrib/native/client/src/clientlib/rpcDecoder.hpp
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#ifndef RPC_DECODER_H
-#define RPC_DECODER_H
-
-#include "rpcMessage.hpp"
-
-namespace Drill {
-
-class RpcDecoder {
-    public:
-        RpcDecoder() { }
-        ~RpcDecoder() { }
-        // bool Decode(const DataBuf& buf);
-        // bool Decode(const DataBuf& buf, InBoundRpcMessage& msg);
-        static int LengthDecode(const uint8_t* buf, uint32_t* length); // read 
the length prefix (at most 4 bytes)
-        static int Decode(const uint8_t* buf, int length, InBoundRpcMessage& 
msg);
-};
-
-} // namespace Drill
-#endif

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcEncoder.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcEncoder.cpp 
b/contrib/native/client/src/clientlib/rpcEncoder.cpp
deleted file mode 100644
index 2f354d7..0000000
--- a/contrib/native/client/src/clientlib/rpcEncoder.cpp
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#include <google/protobuf/io/coded_stream.h>
-#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
-#include <google/protobuf/message_lite.h>
-#include <google/protobuf/wire_format_lite.h>
-
-#include "drill/common.hpp"
-#include "rpcEncoder.hpp"
-#include "rpcMessage.hpp"
-
-namespace Drill{
-
-using google::protobuf::internal::WireFormatLite;
-using exec::rpc::CompleteRpcMessage;
-
-const uint32_t RpcEncoder::HEADER_TAG = 
WireFormatLite::MakeTag(CompleteRpcMessage::kHeaderFieldNumber, 
WireFormatLite::WIRETYPE_LENGTH_DELIMITED);
-const uint32_t RpcEncoder::PROTOBUF_BODY_TAG = 
WireFormatLite::MakeTag(CompleteRpcMessage::kProtobufBodyFieldNumber, 
WireFormatLite::WIRETYPE_LENGTH_DELIMITED);
-const uint32_t RpcEncoder::RAW_BODY_TAG = 
WireFormatLite::MakeTag(CompleteRpcMessage::kRawBodyFieldNumber, 
WireFormatLite::WIRETYPE_LENGTH_DELIMITED);
-const uint32_t RpcEncoder::HEADER_TAG_LENGTH = getRawVarintSize(HEADER_TAG);
-const uint32_t RpcEncoder::PROTOBUF_BODY_TAG_LENGTH = 
getRawVarintSize(PROTOBUF_BODY_TAG);
-const uint32_t RpcEncoder::RAW_BODY_TAG_LENGTH = 
getRawVarintSize(RAW_BODY_TAG);
-
-
-bool RpcEncoder::Encode(DataBuf& buf, OutBoundRpcMessage& msg) {
-    using exec::rpc::RpcHeader;
-    using google::protobuf::io::CodedOutputStream;
-    using google::protobuf::io::ArrayOutputStream;
-    // Todo:
-    //
-    // - let a context manager to allocate a buffer `ByteBuf buf = 
ctx.alloc().buffer();`
-    // - builder pattern
-    //
-    #ifdef CODER_DEBUG
-    cerr << "\nEncoding outbound message " << msg << endl;
-    #endif
-
-    RpcHeader header;
-    header.set_mode(msg.m_mode);
-    header.set_coordination_id(msg.m_coord_id);
-    header.set_rpc_type(msg.m_rpc_type);
-
-    // calcute the length of the message
-    int header_length = header.ByteSize();
-    int proto_body_length = msg.m_pbody->ByteSize();
-    int full_length = HEADER_TAG_LENGTH + getRawVarintSize(header_length) + 
header_length + \
-                      PROTOBUF_BODY_TAG_LENGTH + 
getRawVarintSize(proto_body_length) + proto_body_length;
-
-    /*
-       if(raw_body_length > 0) {
-       full_length += (RAW_BODY_TAG_LENGTH + getRawVarintSize(raw_body_length) 
+ raw_body_length);
-       }
-       */
-
-    buf.resize(full_length + getRawVarintSize(full_length));
-    ArrayOutputStream* os = new ArrayOutputStream(buf.data(), buf.size());
-    CodedOutputStream* cos = new CodedOutputStream(os);
-
-
-    #ifdef CODER_DEBUG
-    cerr << "Writing full length " << full_length << endl;
-    #endif
-
-    // write full length first (this is length delimited stream).
-    cos->WriteVarint32(full_length);
-
-    #ifdef CODER_DEBUG
-    cerr << "Writing header length " << header_length << endl;
-    #endif
-
-    cos->WriteVarint32(HEADER_TAG);
-    cos->WriteVarint32(header_length);
-
-    header.SerializeToCodedStream(cos);
-
-    // write protobuf body length and body
-    #ifdef CODER_DEBUG
-    cerr << "Writing protobuf body length " << proto_body_length << endl;
-    #endif
-
-    cos->WriteVarint32(PROTOBUF_BODY_TAG);
-    cos->WriteVarint32(proto_body_length);
-    msg.m_pbody->SerializeToCodedStream(cos);
-
-    delete os;
-    delete cos;
-
-    // Done! no read to write data body for client
-    return true;
-}
-
-} // namespace Drill

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcEncoder.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcEncoder.hpp 
b/contrib/native/client/src/clientlib/rpcEncoder.hpp
deleted file mode 100644
index a4a7216..0000000
--- a/contrib/native/client/src/clientlib/rpcEncoder.hpp
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-#ifndef RPC_ENCODER_H
-#define RPC_ENCODER_H
-
-#include "rpcMessage.hpp"
-
-namespace Drill {
-
-class RpcEncoder {
-    public:
-        RpcEncoder() {}
-        ~RpcEncoder() { }
-        bool Encode(DataBuf& buf,OutBoundRpcMessage& msg);
-        static const uint32_t HEADER_TAG;
-        static const uint32_t PROTOBUF_BODY_TAG;
-        static const uint32_t RAW_BODY_TAG;
-        static const uint32_t HEADER_TAG_LENGTH;
-        static const uint32_t PROTOBUF_BODY_TAG_LENGTH;
-        static const uint32_t RAW_BODY_TAG_LENGTH;
-};
-
-// copy from java code
-inline int getRawVarintSize(uint32_t value) {
-    int count = 0;
-    while (true) {
-        if ((value & ~0x7F) == 0) {
-            count++;
-            return count;
-        } else {
-            count++;
-            value >>= 7;
-        }
-    }
-}
-
-} // namespace Drill
-#endif

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcMessage.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcMessage.cpp 
b/contrib/native/client/src/clientlib/rpcMessage.cpp
new file mode 100644
index 0000000..13cd7a8
--- /dev/null
+++ b/contrib/native/client/src/clientlib/rpcMessage.cpp
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <google/protobuf/io/coded_stream.h>
+#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
+#include <google/protobuf/message_lite.h>
+#include <google/protobuf/wire_format_lite.h>
+
+#include "drill/common.hpp"
+#include "rpcMessage.hpp"
+
+namespace Drill{
+namespace rpc {
+
+
+namespace {
+using google::protobuf::internal::WireFormatLite;
+using google::protobuf::io::CodedOutputStream;
+using exec::rpc::CompleteRpcMessage;
+
+static const uint32_t HEADER_TAG = 
WireFormatLite::MakeTag(CompleteRpcMessage::kHeaderFieldNumber, 
WireFormatLite::WIRETYPE_LENGTH_DELIMITED);
+static const uint32_t PROTOBUF_BODY_TAG = 
WireFormatLite::MakeTag(CompleteRpcMessage::kProtobufBodyFieldNumber, 
WireFormatLite::WIRETYPE_LENGTH_DELIMITED);
+static const uint32_t RAW_BODY_TAG = 
WireFormatLite::MakeTag(CompleteRpcMessage::kRawBodyFieldNumber, 
WireFormatLite::WIRETYPE_LENGTH_DELIMITED);
+static const uint32_t HEADER_TAG_LENGTH = 
CodedOutputStream::VarintSize32(HEADER_TAG);
+static const uint32_t PROTOBUF_BODY_TAG_LENGTH = 
CodedOutputStream::VarintSize32(PROTOBUF_BODY_TAG);
+}
+
+std::size_t lengthDecode(const uint8_t* buf, uint32_t& length) {
+    using google::protobuf::io::CodedInputStream;
+    using google::protobuf::io::CodedOutputStream;
+
+    // read the frame to get the length of the message and then
+
+    CodedInputStream cis(buf, 5); // read 5 bytes at most
+
+    int startPos(cis.CurrentPosition()); // for debugging
+    if (!cis.ReadVarint32(&length)) {
+       return -1;
+    }
+
+    #ifdef CODER_DEBUG
+    std::cerr << "length = " << length << std::endl;
+    #endif
+
+    int endPos(cis.CurrentPosition());
+
+    assert((endPos-startPos) == CodedOutputStream::VarintSize32(length));
+    return (endPos-startPos);
+}
+
+// TODO: error handling
+//
+// - assume that the entire message is in the buffer and the buffer is 
constrained to this message
+// - easy to handle with raw array in C++
+bool decode(const uint8_t* buf, int length, InBoundRpcMessage& msg) {
+    using google::protobuf::io::CodedInputStream;
+
+    CodedInputStream cis(buf, length);
+
+    int startPos(cis.CurrentPosition()); // for debugging
+
+    CodedInputStream::Limit len_limit(cis.PushLimit(length));
+
+    uint32_t header_length(0);
+
+    if (!cis.ExpectTag(HEADER_TAG)) {
+       return false;
+    }
+
+    if (!cis.ReadVarint32(&header_length)) {
+       return false;
+    }
+
+    #ifdef CODER_DEBUG
+    std::cerr << "Reading header length " << header_length << ", post read 
index " << cis.CurrentPosition() << std::endl;
+    #endif
+
+    exec::rpc::RpcHeader header;
+    CodedInputStream::Limit header_limit(cis.PushLimit(header_length));
+
+    if (!header.ParseFromCodedStream(&cis)) {
+       return false;
+    }
+    cis.PopLimit(header_limit);
+
+    msg.m_has_mode = header.has_mode();
+    msg.m_mode = header.mode();
+    msg.m_coord_id = header.coordination_id();
+    msg.m_has_rpc_type = header.has_rpc_type();
+    msg.m_rpc_type = header.rpc_type();
+
+    // read the protobuf body into a buffer.
+    if (!cis.ExpectTag(PROTOBUF_BODY_TAG)) {
+       return false;
+    }
+
+    uint32_t pbody_length(0);
+    if (!cis.ReadVarint32(&pbody_length)) {
+       return false;
+    }
+
+    #ifdef CODER_DEBUG
+    std::cerr << "Reading protobuf body length " << pbody_length << ", post 
read index " << cis.CurrentPosition() << std::endl;
+    #endif
+
+    msg.m_pbody.resize(pbody_length);
+    if (!cis.ReadRaw(msg.m_pbody.data(), pbody_length)) {
+       return false;
+    }
+
+    // read the data body.
+    if (cis.BytesUntilLimit() > 0 ) {
+               #ifdef CODER_DEBUG
+                       std::cerr << "Reading raw body, buffer has "<< 
std::cis->BytesUntilLimit() << " bytes available, current possion "<< 
cis.CurrentPosition()  << endl;
+               #endif
+        if (!cis.ExpectTag(RAW_BODY_TAG)) {
+               return false;
+        }
+
+        uint32_t dbody_length = 0;
+        if (!cis.ReadVarint32(&dbody_length)) {
+               return false;
+        }
+
+        if(cis.BytesUntilLimit() != dbody_length) {
+                       #ifdef CODER_DEBUG
+                                       cerr << "Expected to receive a raw body 
of " << dbody_length << " bytes but received a buffer with " 
<<cis->BytesUntilLimit() << " bytes." << endl;
+                       #endif
+                       return false;
+        }
+
+        int currPos(cis.CurrentPosition());
+        int size;
+        cis.GetDirectBufferPointer(const_cast<const 
void**>(reinterpret_cast<void**>(&msg.m_dbody)), &size);
+        cis.Skip(size);
+
+        assert(dbody_length == size);
+        assert(msg.m_dbody==buf+currPos);
+               #ifdef CODER_DEBUG
+                       cerr << "Read raw body of " << dbody_length << " bytes" 
<< endl;
+               #endif
+    } else {
+               #ifdef CODER_DEBUG
+                       cerr << "No need to read raw body, no readable bytes 
left." << endl;
+               #endif
+    }
+    cis.PopLimit(len_limit);
+
+
+    // return the rpc message.
+    // move the reader index forward so the next rpc call won't try to work 
with it.
+    // buffer.skipBytes(dBodyLength);
+    // messageCounter.incrementAndGet();
+    #ifdef CODER_DEBUG
+    std::cerr << "Inbound Rpc Message Decoded " << msg << std::endl;
+    #endif
+
+    int endPos = cis.CurrentPosition();
+    assert((endPos-startPos) == length);
+    return true;
+}
+
+
+bool encode(DataBuf& buf, const OutBoundRpcMessage& msg) {
+    using exec::rpc::RpcHeader;
+    using google::protobuf::io::CodedOutputStream;
+    // Todo:
+    //
+    // - let a context manager to allocate a buffer `ByteBuf buf = 
ctx.alloc().buffer();`
+    // - builder pattern
+    //
+    #ifdef CODER_DEBUG
+    std::cerr << "Encoding outbound message " << msg << std::endl;
+    #endif
+
+    RpcHeader header;
+    header.set_mode(msg.m_mode);
+    header.set_coordination_id(msg.m_coord_id);
+    header.set_rpc_type(msg.m_rpc_type);
+
+    // calcute the length of the message
+    int header_length = header.ByteSize();
+    int proto_body_length = msg.m_pbody->ByteSize();
+    int full_length = HEADER_TAG_LENGTH + 
CodedOutputStream::VarintSize32(header_length) + header_length + \
+                      PROTOBUF_BODY_TAG_LENGTH + 
CodedOutputStream::VarintSize32(proto_body_length) + proto_body_length;
+
+    /*
+       if(raw_body_length > 0) {
+       full_length += (RAW_BODY_TAG_LENGTH + getRawVarintSize(raw_body_length) 
+ raw_body_length);
+       }
+       */
+
+    buf.resize(full_length + CodedOutputStream::VarintSize32(full_length));
+
+    uint8_t* data = buf.data();
+
+    #ifdef CODER_DEBUG
+    std::cerr << "Writing full length " << full_length << std::endl;
+    #endif
+
+    data = CodedOutputStream::WriteVarint32ToArray(full_length, data);
+
+    #ifdef CODER_DEBUG
+    std::cerr << "Writing header length " << header_length << std::endl;
+    #endif
+
+    data = CodedOutputStream::WriteVarint32ToArray(HEADER_TAG, data);
+    data = CodedOutputStream::WriteVarint32ToArray(header_length, data);
+
+    data = header.SerializeWithCachedSizesToArray(data);
+
+    // write protobuf body length and body
+    #ifdef CODER_DEBUG
+    std::cerr << "Writing protobuf body length " << proto_body_length << 
std::endl;
+    #endif
+
+    data = CodedOutputStream::WriteVarint32ToArray(PROTOBUF_BODY_TAG, data);
+    data = CodedOutputStream::WriteVarint32ToArray(proto_body_length, data);
+    msg.m_pbody->SerializeWithCachedSizesToArray(data);
+
+    // Done! no read to write data body for client
+    return true;
+}
+} // namespace rpc
+} // namespace Drill

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/rpcMessage.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/rpcMessage.hpp 
b/contrib/native/client/src/clientlib/rpcMessage.hpp
index 6696971..15487e9 100644
--- a/contrib/native/client/src/clientlib/rpcMessage.hpp
+++ b/contrib/native/client/src/clientlib/rpcMessage.hpp
@@ -25,8 +25,8 @@
 #include "GeneralRPC.pb.h"
 
 namespace Drill {
-
-class InBoundRpcMessage {
+namespace rpc {
+struct InBoundRpcMessage {
     public:
         exec::rpc::RpcMode m_mode;
         int m_rpc_type;
@@ -39,7 +39,7 @@ class InBoundRpcMessage {
         bool has_rpc_type() { return m_has_rpc_type; };
 };
 
-class OutBoundRpcMessage {
+struct OutBoundRpcMessage {
     public:
         exec::rpc::RpcMode m_mode;
         int m_rpc_type;
@@ -49,6 +49,13 @@ class OutBoundRpcMessage {
             m_mode(mode), m_rpc_type(rpc_type), m_coord_id(coord_id), 
m_pbody(pbody) { }
 };
 
-}
+std::size_t lengthDecode(const uint8_t* buf, uint32_t& length);
+
+bool decode(const uint8_t* buf, int length, InBoundRpcMessage& msg);
+
+bool encode(DataBuf& buf, const OutBoundRpcMessage& msg);
+
+} // namespace rpc
+} // namespace Drill
 
 #endif

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/utils.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/utils.cpp 
b/contrib/native/client/src/clientlib/utils.cpp
index 1e6a877..d3c8f08 100644
--- a/contrib/native/client/src/clientlib/utils.cpp
+++ b/contrib/native/client/src/clientlib/utils.cpp
@@ -22,6 +22,13 @@
 #include "logger.hpp"
 #include "drill/common.hpp"
 
+#if defined _WIN32  || defined _WIN64
+//Windows header files redefine 'max'
+#ifdef max
+#undef max
+#endif
+#endif
+
 namespace Drill{
 
 

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/utils.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/utils.hpp 
b/contrib/native/client/src/clientlib/utils.hpp
index 3237aa3..4cd8fa5 100644
--- a/contrib/native/client/src/clientlib/utils.hpp
+++ b/contrib/native/client/src/clientlib/utils.hpp
@@ -31,7 +31,6 @@
     #undef random
   #endif
 #endif
-#include <boost/asio/deadline_timer.hpp>
 #include <boost/random/mersenne_twister.hpp> // for mt19937
 #include <boost/random/random_device.hpp>
 #include <boost/random/uniform_int.hpp>

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/y2038/time64.c
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/y2038/time64.c 
b/contrib/native/client/src/clientlib/y2038/time64.c
index e0d61c8..bbbabe2 100644
--- a/contrib/native/client/src/clientlib/y2038/time64.c
+++ b/contrib/native/client/src/clientlib/y2038/time64.c
@@ -110,15 +110,15 @@ static const int safe_years_low[SOLAR_CYCLE_LENGTH] = {
 };
 
 /* This isn't used, but it's handy to look at */
-static const char dow_year_start[SOLAR_CYCLE_LENGTH] = {
-    5, 0, 1, 2,     /* 0       2016 - 2019 */
-    3, 5, 6, 0,     /* 4  */
-    1, 3, 4, 5,     /* 8       1996 - 1998, 1971*/
-    6, 1, 2, 3,     /* 12      1972 - 1975 */
-    4, 6, 0, 1,     /* 16 */
-    2, 4, 5, 6,     /* 20      2036, 2037, 2010, 2011 */
-    0, 2, 3, 4      /* 24      2012, 2013, 2014, 2015 */
-};
+//static const char dow_year_start[SOLAR_CYCLE_LENGTH] = {
+//    5, 0, 1, 2,     /* 0       2016 - 2019 */
+//    3, 5, 6, 0,     /* 4  */
+//    1, 3, 4, 5,     /* 8       1996 - 1998, 1971*/
+//    6, 1, 2, 3,     /* 12      1972 - 1975 */
+//    4, 6, 0, 1,     /* 16 */
+//    2, 4, 5, 6,     /* 20      2036, 2037, 2010, 2011 */
+//    0, 2, 3, 4      /* 24      2012, 2013, 2014, 2015 */
+//};
 
 /* Let's assume people are going to be looking for dates in the future.
    Let's provide some cheats so you can skip ahead.

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/zookeeperClient.cpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/zookeeperClient.cpp 
b/contrib/native/client/src/clientlib/zookeeperClient.cpp
new file mode 100644
index 0000000..535bebc
--- /dev/null
+++ b/contrib/native/client/src/clientlib/zookeeperClient.cpp
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <boost/bind.hpp>
+#include <drill/drillClient.hpp>
+#include "zookeeperClient.hpp"
+
+#include "errmsgs.hpp"
+#include "logger.hpp"
+
+namespace Drill {
+std::string ZookeeperClient::s_defaultDrillPath("/drill/drillbits1");
+static void watcherCallback(zhandle_t *zzh, int type, int state, const char 
*path, void* context) {
+       static_cast<ZookeeperClient*>(context)->watcher(zzh, type, state, path, 
context);
+}
+
+ZookeeperClient::ZookeeperClient(const std::string& drillPath)
+: p_zh(), m_state(), m_path(drillPath) {
+    m_bConnecting=true;
+    memset(&m_id, 0, sizeof(m_id));
+}
+
+ZookeeperClient::~ZookeeperClient(){
+}
+
+ZooLogLevel ZookeeperClient::getZkLogLevel(){
+    //typedef enum {ZOO_LOG_LEVEL_ERROR=1,
+    //    ZOO_LOG_LEVEL_WARN=2,
+    //    ZOO_LOG_LEVEL_INFO=3,
+    //    ZOO_LOG_LEVEL_DEBUG=4
+    //} ZooLogLevel;
+    switch(DrillClientConfig::getLogLevel()){
+        case LOG_TRACE:
+        case LOG_DEBUG:
+            return ZOO_LOG_LEVEL_DEBUG;
+        case LOG_INFO:
+            return ZOO_LOG_LEVEL_INFO;
+        case LOG_WARNING:
+            return ZOO_LOG_LEVEL_WARN;
+        case LOG_ERROR:
+        case LOG_FATAL:
+        default:
+            return ZOO_LOG_LEVEL_ERROR;
+    }
+    return ZOO_LOG_LEVEL_ERROR;
+}
+
+void ZookeeperClient::watcher(zhandle_t *zzh, int type, int state, const char 
*path, void*) {
+    //From cli.c
+
+    /* Be careful using zh here rather than zzh - as this may be mt code
+     * the client lib may call the watcher before zookeeper_init returns */
+
+    this->m_state=state;
+    if (type == ZOO_SESSION_EVENT) {
+        if (state == ZOO_CONNECTED_STATE) {
+        } else if (state == ZOO_AUTH_FAILED_STATE) {
+            this->m_err= getMessage(ERR_CONN_ZKNOAUTH);
+            this->close();
+        } else if (state == ZOO_EXPIRED_SESSION_STATE) {
+               this->m_err= getMessage(ERR_CONN_ZKEXP);
+               this->close();
+        }
+    }
+    // signal the cond var
+    {
+        if (state == ZOO_CONNECTED_STATE){
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << 
std::endl;)
+        }
+        boost::lock_guard<boost::mutex> bufferLock(this->m_cvMutex);
+        this->m_bConnecting=false;
+    }
+    this->m_cv.notify_one();
+}
+
+int ZookeeperClient::getAllDrillbits(const std::string& connectStr, 
std::vector<std::string>& drillbits){
+    uint32_t waitTime=30000; // 10 seconds
+    zoo_set_debug_level(getZkLogLevel());
+    zoo_deterministic_conn_order(1); // enable deterministic order
+
+    p_zh = boost::shared_ptr<zhandle_t>(zookeeper_init(connectStr.c_str(), 
&watcherCallback, waitTime, &m_id, this, 0), zookeeper_close);
+    if(!p_zh) {
+        m_err = getMessage(ERR_CONN_ZKFAIL);
+        return -1;
+    }
+
+    m_err="";
+       //Wait for the completion handler to signal successful connection
+       boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex);
+       boost::system_time const timeout=boost::get_system_time()+ 
boost::posix_time::milliseconds(waitTime);
+       while(this->m_bConnecting) {
+               if(!this->m_cv.timed_wait(bufferLock, timeout)){
+                       m_err = getMessage(ERR_CONN_ZKTIMOUT);
+                       return -1;
+               }
+       }
+
+    if(m_state!=ZOO_CONNECTED_STATE){
+        return -1;
+    }
+
+    int rc = ZOK;
+
+    struct String_vector drillbitsVector;
+    rc=zoo_get_children(p_zh.get(), m_path.c_str(), 0, &drillbitsVector);
+    if(rc!=ZOK){
+        m_err=getMessage(ERR_CONN_ZKERR, rc);
+        p_zh.reset();
+        return -1;
+    }
+
+    // Make sure we deallocate drillbitsVector properly when we exit
+    boost::shared_ptr<String_vector> guard(&drillbitsVector, 
deallocate_String_vector);
+
+    if(drillbitsVector.count > 0){
+        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Found " << drillbitsVector.count 
<< " drillbits in cluster ("
+                << connectStr << "/" << m_path
+                << ")." <<std::endl;)
+               for(int i=0; i<drillbitsVector.count; i++){
+                       drillbits.push_back(drillbitsVector.data[i]);
+               }
+        for(int i=0; i<drillbits.size(); i++){
+            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "\t Unshuffled Drillbit id: " 
<< drillbits[i] << std::endl;)
+        }
+    }
+    return 0;
+}
+
+int ZookeeperClient::getEndPoint(const std::string& drillbit, 
exec::DrillbitEndpoint& endpoint){
+    int rc = ZOK;
+       // pick the drillbit at 'index'
+       std::string s(m_path +  "/" + drillbit);
+       int buffer_len=MAX_CONNECT_STR;
+       char buffer[MAX_CONNECT_STR+1];
+       struct Stat stat;
+       buffer[MAX_CONNECT_STR]=0;
+       rc= zoo_get(p_zh.get(), s.c_str(), 0, buffer,  &buffer_len, &stat);
+       if(rc!=ZOK){
+               m_err=getMessage(ERR_CONN_ZKDBITERR, rc);
+               return -1;
+       }
+       exec::DrillServiceInstance drillServiceInstance;
+       drillServiceInstance.ParseFromArray(buffer, buffer_len);
+       endpoint=drillServiceInstance.endpoint();
+
+    return 0;
+}
+
+void ZookeeperClient::close(){
+       p_zh.reset();
+}
+
+} /* namespace Drill */

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/clientlib/zookeeperClient.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/clientlib/zookeeperClient.hpp 
b/contrib/native/client/src/clientlib/zookeeperClient.hpp
new file mode 100644
index 0000000..25d6af5
--- /dev/null
+++ b/contrib/native/client/src/clientlib/zookeeperClient.hpp
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef _WIN32
+#include <zookeeper.h>
+#else
+#include <zookeeper/zookeeper.h>
+#endif
+
+#include <boost/shared_ptr.hpp>
+#include <boost/thread/condition_variable.hpp>
+#include <boost/thread/mutex.hpp>
+
+#include "UserBitShared.pb.h"
+
+
+#ifndef ZOOKEEPER_CLIENT_H
+#define ZOOKEEPER_CLIENT_H
+
+namespace Drill {
+class ZookeeperClient{
+    public:
+               static std::string s_defaultDrillPath;
+
+        ZookeeperClient(const std::string& drillPath = s_defaultDrillPath);
+        ~ZookeeperClient();
+        static ZooLogLevel getZkLogLevel();
+        // comma separated host:port pairs, each corresponding to a zk
+        // server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
+        void close();
+        const std::string& getError() const{return m_err;}
+        // return unshuffled list of drillbits
+        int getAllDrillbits(const std::string& connectStr, 
std::vector<std::string>& drillbits);
+        // picks the index drillbit and returns the corresponding endpoint 
object
+        int getEndPoint(const std::string& drillbit, exec::DrillbitEndpoint& 
endpoint);
+
+        void watcher(zhandle_t *zzh, int type, int state, const char *path, 
void* context);
+
+    private:
+        boost::shared_ptr<zhandle_t> p_zh;
+        clientid_t m_id;
+        int m_state;
+        std::string m_err;
+
+        boost::mutex m_cvMutex;
+        // Condition variable to signal connection callback has been processed
+        boost::condition_variable m_cv;
+        bool m_bConnecting;
+        std::string m_path;
+
+};
+} /* namespace Drill */
+
+
+
+#endif /* ZOOKEEPER_H */

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/include/drill/collections.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/collections.hpp 
b/contrib/native/client/src/include/drill/collections.hpp
new file mode 100644
index 0000000..9fbfcc5
--- /dev/null
+++ b/contrib/native/client/src/include/drill/collections.hpp
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _DRILL_COLLECTIONS_H
+#define _DRILL_COLLECTIONS_H
+
+#include <iterator>
+
+#include <boost/noncopyable.hpp>
+#include <boost/shared_ptr.hpp>
+
+namespace Drill {
+namespace impl {
+
+/**
+ * Interface for internal iterators
+ */
+template<typename T>
+class DrillIteratorImpl: private boost::noncopyable {
+public:
+       typedef DrillIteratorImpl<T> iterator;
+       typedef boost::shared_ptr<iterator> iterator_ptr;
+
+       typedef T value_type;
+       typedef value_type& reference;
+       typedef value_type* pointer;
+
+       virtual ~DrillIteratorImpl() {};
+
+       // To allow conversion from non-const to const types
+       virtual operator typename DrillIteratorImpl<const T>::iterator_ptr() 
const = 0;
+
+       virtual reference operator*() const = 0;
+       virtual pointer   operator->() const = 0;
+
+       virtual iterator& operator++() = 0;
+
+       virtual bool operator==(const iterator& x) const = 0;
+       virtual bool operator!=(const iterator& x) const = 0;
+};
+
+/**
+ * Interface for internal collections
+ */
+template<typename T>
+class DrillCollectionImpl: private boost::noncopyable {
+public:
+       // STL-like iterator typedef
+       typedef DrillIteratorImpl<T> iterator;
+       typedef boost::shared_ptr<iterator> iterator_ptr;
+       typedef DrillIteratorImpl<const T> const_iterator;
+       typedef boost::shared_ptr<const_iterator> const_iterator_ptr;
+
+       typedef T value_type;
+       typedef value_type& reference;
+       typedef const value_type& const_reference;
+       typedef value_type* pointer;
+       typedef const value_type* const_pointer;
+       typedef int size_type;
+
+       virtual ~DrillCollectionImpl() {}
+
+       virtual iterator_ptr begin() = 0;
+       virtual const_iterator_ptr begin() const = 0;
+       virtual iterator_ptr end() = 0;
+       virtual const_iterator_ptr end() const = 0;
+};
+} // namespace internal
+
+template<typename T>
+class DrillCollection;
+
+template<typename T>
+class DrillIterator: public std::iterator<std::input_iterator_tag, T> {
+public:
+       typedef impl::DrillIteratorImpl<T> Impl;
+       typedef boost::shared_ptr<Impl> ImplPtr;
+
+       typedef DrillIterator<T> iterator;
+       typedef std::iterator<std::input_iterator_tag, T> superclass;
+       typedef typename superclass::reference reference;
+       typedef typename superclass::pointer pointer;
+
+       // Default constructor
+       DrillIterator(): m_pImpl() {};
+       ~DrillIterator() {}
+
+       // Iterators are CopyConstructible and CopyAssignable
+       DrillIterator(const iterator& it): m_pImpl(it.m_pImpl) {}
+       iterator& operator=(const iterator& it) {
+               m_pImpl = it.m_pImpl;
+               return *this;
+       }
+
+       template<typename U>
+       DrillIterator(const DrillIterator<U>& it): m_pImpl(*it.m_pImpl) {}
+
+       reference operator*() const { return m_pImpl->operator*(); }
+       pointer   operator->() const { return m_pImpl->operator->(); }
+
+       iterator& operator++() { m_pImpl->operator++(); return *this; }
+
+       bool operator==(const iterator& x) const { 
+               if (m_pImpl == x.m_pImpl) {
+                       return true;
+               }
+               return m_pImpl && m_pImpl->operator==(*x.m_pImpl);
+       }
+
+       bool operator!=(const iterator& x) const { 
+               if (m_pImpl == x.m_pImpl) {
+                       return false;
+               }
+               return !m_pImpl ||  m_pImpl->operator!=(*x.m_pImpl);
+       }
+
+private:
+       template<typename U>
+       friend class DrillCollection;
+       template<typename U>
+       friend class DrillIterator;
+
+       ImplPtr m_pImpl;
+
+       template<typename U>
+       DrillIterator(const boost::shared_ptr<impl::DrillIteratorImpl<U> >& 
pImpl): m_pImpl(pImpl) {}
+};
+
+template<typename T>
+class DrillCollection {
+public:
+       typedef impl::DrillCollectionImpl<T> Impl;
+       typedef boost::shared_ptr<Impl> ImplPtr;
+
+       // STL-like iterator typedef
+       typedef DrillIterator<T> iterator;
+       typedef DrillIterator<const T> const_iterator;
+       typedef T value_type;
+       typedef value_type& reference;
+       typedef const value_type& const_reference;
+       typedef value_type* pointer;
+       typedef const value_type* const_pointer;
+       typedef int size_type;
+
+       iterator       begin()       { return iterator(m_pImpl->begin()); }
+       const_iterator begin() const { return 
const_iterator(boost::const_pointer_cast<const Impl>(m_pImpl)->begin()); }
+       iterator       end()         { return iterator(m_pImpl->end()); }
+       const_iterator end() const   { return 
const_iterator(boost::const_pointer_cast<const Impl>(m_pImpl)->end()); }
+
+protected:
+       DrillCollection(const ImplPtr& impl): m_pImpl(impl) {}
+
+       Impl& operator*() { return *m_pImpl; }
+       const Impl& operator*() const { return *m_pImpl; }
+       Impl* operator->() { return m_pImpl.get(); }
+       const Impl* operator->() const { return m_pImpl.get(); }
+
+private:
+       ImplPtr m_pImpl;
+};
+
+
+} /* namespace Drill */
+#endif /* _DRILL_COLLECTIONS_H */

http://git-wip-us.apache.org/repos/asf/drill/blob/166c4ce7/contrib/native/client/src/include/drill/common.hpp
----------------------------------------------------------------------
diff --git a/contrib/native/client/src/include/drill/common.hpp 
b/contrib/native/client/src/include/drill/common.hpp
index a617dc7..6d3816e 100644
--- a/contrib/native/client/src/include/drill/common.hpp
+++ b/contrib/native/client/src/include/drill/common.hpp
@@ -20,6 +20,24 @@
 #ifndef _COMMON_H_
 #define _COMMON_H_
 
+#if defined _WIN32 || defined __CYGWIN__
+  #ifdef DRILL_CLIENT_EXPORTS
+      #define DECLSPEC_DRILL_CLIENT __declspec(dllexport)
+  #else
+    #ifdef USE_STATIC_LIBDRILL
+      #define DECLSPEC_DRILL_CLIENT
+    #else
+      #define DECLSPEC_DRILL_CLIENT  __declspec(dllimport)
+    #endif
+  #endif
+#else
+  #if __GNUC__ >= 4
+    #define DECLSPEC_DRILL_CLIENT __attribute__ ((visibility ("default")))
+  #else
+    #define DECLSPEC_DRILL_CLIENT
+  #endif
+#endif
+
 #ifdef _WIN32
 // The order of inclusion is important. Including winsock2 before everything 
else
 // ensures that the correct typedefs are defined and that the older typedefs 
defined

Reply via email to