Github user parthchandra commented on a diff in the pull request:

    https://github.com/apache/drill/pull/950#discussion_r141725701
  
    --- Diff: contrib/native/client/src/clientlib/streamSocket.hpp ---
    @@ -0,0 +1,212 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +
    +#ifndef STREAMSOCKET_HPP
    +#define STREAMSOCKET_HPP
    +
    +#include "drill/common.hpp"
    +#include "env.h"
    +#include "wincert.ipp"
    +
    +#include <boost/asio.hpp>
    +#include <boost/asio/ssl.hpp>
    +
    +namespace Drill {
    +
    +typedef boost::asio::ip::tcp::socket::lowest_layer_type streamSocket_t;
    +typedef boost::asio::ssl::stream<boost::asio::ip::tcp::socket> 
sslTCPSocket_t;
    +typedef boost::asio::ip::tcp::socket basicTCPSocket_t;
    +
    +
    +// Some helper typedefs to define the highly templatized boost::asio 
methods
    +typedef boost::asio::const_buffers_1 ConstBufferSequence; 
    +typedef boost::asio::mutable_buffers_1 MutableBufferSequence;
    +
    +// ReadHandlers have different possible signatures.
    +//
    +// As a standard C-type callback
    +//    typedef void (*ReadHandler)(const boost::system::error_code& ec, 
std::size_t bytes_transferred);
    +//
    +// Or as a C++ functor
    +//    struct ReadHandler {
    +//        virtual void operator()(
    +//                const boost::system::error_code& ec,
    +//                std::size_t bytes_transferred) = 0;
    +//};
    +//
    +// We need a different signature though, since we need to pass in a member 
function of a drill client 
    +// class (which is C++), as a functor generated by boost::bind as a 
ReadHandler
    +// 
    +typedef boost::function<void (const boost::system::error_code& ec, 
std::size_t bytes_transferred) > ReadHandler;
    +
    +class AsioStreamSocket{
    +    public:
    +        virtual ~AsioStreamSocket(){};
    +        virtual streamSocket_t& getInnerSocket() = 0;
    +
    +        virtual std::size_t writeSome(
    +                const ConstBufferSequence& buffers,
    +                boost::system::error_code & ec) = 0;
    +
    +        virtual std::size_t readSome(
    +                const MutableBufferSequence& buffers,
    +                boost::system::error_code & ec) = 0;
    +
    +        //
    +        // boost::asio::async_read has the signature 
    +        // template<
    +        //     typename AsyncReadStream,
    +        //     typename MutableBufferSequence,
    +        //     typename ReadHandler>
    +        // void-or-deduced async_read(
    +        //     AsyncReadStream & s,
    +        //     const MutableBufferSequence & buffers,
    +        //     ReadHandler handler);
    +        //
    +        // For our use case, the derived class will have an instance of a 
concrete type for AsyncReadStream which 
    +        // will implement the requirements for the AsyncReadStream type. 
We need not pass that in as a parameter 
    +        // since the class already has the value
    +        // The method is templatized since the ReadHandler type is 
dependent on the class implementing the read 
    +        // handler (basically the class using the asio stream)
    +        //
    +        virtual void asyncRead( const MutableBufferSequence & buffers, 
ReadHandler handler) = 0;
    +
    +        // call the underlying protocol's handshake method.
    +        // if the useSystemConfig flag is true, then use properties read
    +        // from the underlying operating system
    +        virtual void protocolHandshake(bool useSystemConfig) = 0;
    +        virtual void protocolClose() = 0;
    +};
    +
    +class Socket: 
    +    public AsioStreamSocket, 
    +    public basicTCPSocket_t{
    +
    +    public:
    +        Socket(boost::asio::io_service& ioService) : 
basicTCPSocket_t(ioService) {
    +            }
    +
    +        ~Socket(){
    +            boost::system::error_code ignorederr;
    +            this->shutdown(boost::asio::ip::tcp::socket::shutdown_both, 
ignorederr);
    +            this->close();
    +        };
    +
    +        basicTCPSocket_t& getSocketStream(){ return *this;}
    +
    +        streamSocket_t& getInnerSocket(){ return this->lowest_layer();}
    +
    +        std::size_t writeSome(
    +                const ConstBufferSequence& buffers,
    +                boost::system::error_code & ec){
    +            return this->write_some(buffers, ec);
    +        }
    +
    +        std::size_t readSome(
    +                const MutableBufferSequence& buffers,
    +                boost::system::error_code & ec){
    +            return this->read_some(buffers, ec);
    +        }
    +
    +        void asyncRead( const MutableBufferSequence & buffers, ReadHandler 
handler){
    +            return async_read(*this, buffers, handler);
    +        }
    +
    +        void protocolHandshake(bool useSystemConfig){}; //nothing to do
    +        void protocolClose(){ 
    +            // shuts down the socket!
    +            boost::system::error_code ignorederr;
    +            
((basicTCPSocket_t*)this)->shutdown(boost::asio::ip::tcp::socket::shutdown_both,
    +                ignorederr
    +                );         
    +        } 
    +};
    +
    +
    +#if defined(IS_SSL_ENABLED)
    +
    +class SslSocket: 
    +    public AsioStreamSocket, 
    +    public sslTCPSocket_t{
    +
    +    public:
    +        SslSocket(boost::asio::io_service& ioService, 
boost::asio::ssl::context &sslContext) :
    +            sslTCPSocket_t(ioService, sslContext) {
    +            }
    +
    +        ~SslSocket(){};
    +
    +        sslTCPSocket_t& getSocketStream(){ return *this;}
    +
    +        streamSocket_t& getInnerSocket(){ return this->lowest_layer();}
    +
    +        std::size_t writeSome(
    +                const ConstBufferSequence& buffers,
    +                boost::system::error_code & ec){
    +            return this->write_some(buffers, ec);
    +        }
    +
    +        std::size_t readSome(
    +                const MutableBufferSequence& buffers,
    +                boost::system::error_code & ec){
    +            return this->read_some(buffers, ec);
    +        }
    +
    +        void asyncRead( const MutableBufferSequence & buffers, ReadHandler 
handler){
    +            return async_read(*this, buffers, handler);
    +        }
    +
    +        //
    +        // public method that can be invoked by callers to invoke the ssl 
handshake
    +        // throws: boost::system::system_error
    +        void protocolHandshake(bool useSystemConfig){
    +            if(useSystemConfig){
    +                if (loadSystemTrustStore(this->native_handle())) {
    +                    boost::system::error_code ec(EPROTO, 
boost::system::system_category());
    +                    boost::asio::detail::throw_error(ec, "Failed to load 
system trust store");
    +                }
    +            }
    +            
this->handshake(boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::client);
    +            return;
    +        };
    +        //
    +        // public method that can be invoked by callers to invoke a clean 
ssl shutdown
    +        // throws: boost::system::system_error
    +        void protocolClose(){
    +            try{
    +                this->shutdown();
    +            }catch(boost::system::system_error e){
    +                //swallow the exception. The channel is unusable anyway
    +            }
    +            // shuts down the socket!
    +            boost::system::error_code ignorederr;
    +            
this->lowest_layer().shutdown(boost::asio::ip::tcp::socket::shutdown_both,
    +                ignorederr
    --- End diff --
    
    My mistake. Thanks for catching this.


---

Reply via email to