Hi Marios,

Good stuff, though I have comments ;)

First off, some stylistic remarks:

      * make sure the first line of the commit message stays below 80
        characters
      * the same for some of the code; try to keep lines to 80 chars or
        less.
      * please don't use tabs for indentation, since they'll look
        different for everybody. (I also cut whitespace at the end of
        lines; I am more than happy to share my Emacs setup that does
        all that automatically)
      * I generally dislike comments like 'end # each_key do' - if they
        are really necessary, it's a sign that the code probably needs
        to be split up

On Fri, 2011-05-20 at 19:09 +0300, [email protected] wrote:
> From: marios <[email protected]>
> 
> 
> Signed-off-by: marios <[email protected]>
> ---
>  server/lib/deltacloud/drivers/ec2/ec2_driver.rb |    4 +-
>  server/lib/deltacloud/helpers/blob_stream.rb    |  168 
> +++++++++++++++++++++++
>  server/server.rb                                |   40 +++++-
>  server/views/blobs/new.html.haml                |   14 +-
>  4 files changed, 210 insertions(+), 16 deletions(-)
> 
> diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb 
> b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> index 14c5829..a31d358 100644
> --- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> +++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> @@ -1,3 +1,5 @@
> +# Copyright (C) 2009, 2010  Red Hat, Inc.
> +#

This needs to be removed - it'll just lead to tears for the next
release.

>  # Licensed to the Apache Software Foundation (ASF) under one or more
>  # contributor license agreements.  See the NOTICE file distributed with
>  # this work for additional information regarding copyright ownership.  The
> @@ -396,7 +398,7 @@ module Deltacloud
>          end
>  
>          #--
> -        # Create Blob
> +        # Create Blob - NON Streaming way (i.e. was called with POST html 
> multipart form data)
>          #--
>          def create_blob(credentials, bucket_id, blob_id, data = nil, opts = 
> {})
>            s3_client = new_client(credentials, :s3)
> diff --git a/server/lib/deltacloud/helpers/blob_stream.rb 
> b/server/lib/deltacloud/helpers/blob_stream.rb
> index fce14d0..a96b9ca 100644
> --- a/server/lib/deltacloud/helpers/blob_stream.rb
> +++ b/server/lib/deltacloud/helpers/blob_stream.rb
> @@ -79,3 +79,171 @@ class Hash
>    end #def
>  
>  end #class
> +
> +#Monkey patch for streaming blobs:
> +# Normally a client will upload a blob to deltacloud and thin will put this 
> into a tempfile.
> +# Then deltacloud would stream up to the provider: client =-->>TEMP_FILE-->> 
> deltacloud =-->>STREAM-->> provider
> +# Instead we want to recognise that this is a 'Post blob' operation and 
> start streaming to the provider as
> +# the request is received: client =-->>STREAM-->> deltacloud =-->>STREAM-->> 
> provider

Good that you added the comment - it should be formatted a little better
though to be more readable.

> +module Thin
> +  class Request
> +
> +    alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile
> +    private
> +      def move_body_to_tempfile
> +        if BlobStreamIO::is_put_blob(self)
> +          @body = BlobStreamIO.new(self)
> +        else
> +          move_body_to_tempfile_orig
> +        end
> +      end
> +
> +  end
> +end
> +
> +require 'net/http'
> +#monkey patch for Net:HTTP
> +module Net
> +     class HTTP
> +
> +             alias :request_orig :request
> +
> +             @blob_req = nil # needs global scope for close op later

I don't think that does what you need it to do, since this code is
evaluated once, when the class is loaded. But since @blob_req is an
instance variable that is used nowhere else, it will always be nil on
newly created Net::HTTP instances.

Strictly speaking, there's no need to clear out @blob_req; but you could
do that in end_request, after calling end_transport. That is still
vulnerable to requests being interrupted before they reach that point.

> +             def request(req, body = nil, blob_stream = nil, &block)
> +                     unless blob_stream
> +                             return request_orig(req, body, &block)
> +                     end
> +                     @blob_req = req
> +                     do_start #start the connection
> +
> +      req.set_body_internal body
> +      begin_transport req
> +      req.write_header_m @socket,@curr_http_version, edit_path(req.path)
> +                     @socket
> +             end
> +
> +             class Put < HTTPRequest
> +                     def write_header_m(sock, ver, path)
> +                             write_header(sock, ver, path)
> +                     end
> +             end
> +
> +             def end_request
> +         begin
> +         res = HTTPResponse.read_new(@socket)
> +      end while res.kind_of?(HTTPContinue)
> +      res.reading_body(@socket, @blob_req.response_body_permitted?) {
> +           yield res if block_given?
> +       }
> +     end_transport @blob_req, res
> +                     do_finish

Set @blob_req = nil here

> +                     res
> +             end
> +     end
> +
> +end
> +
> +
> +
> +
> +require 'aws'
> +require 'uri'
> +class BlobStreamIO
> +
> +  attr_accessor :size, :provider, :sock
> +
> +  def initialize(request)
> +             @request = request
> +    @size = 0
> +    provider = new_provider(request.driver.to_s)
> +    bucket, blob = parse_args(request.env["PATH_INFO"], :bucket)
> +             user, password = parse_args(request.env['HTTP_AUTHORIZATION'], 
> :credentials)
> +             content_type = request.env['CONTENT_TYPE']
> +             content_type ||= ""

Simpler: content_type = request.env['CONTENT_TYPE'] || ""

> +    @content_length = request.env['CONTENT_LENGTH']
> +             #user_meta = {}
> +             #meta_array = request.env.select{|k,v| 
> k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> +             #meta_array.inject({}){ |result, array| 
> user_meta[array.first.upcase] = array.last}

Remove this if it isn't needed anymore

> +             uri = URI.parse(provider)
> +             timestamp = Time.now.httpdate
> +             string_to_sign = 
> "PUT\n\n#{content_type}\n#{timestamp}\n/#{bucket}/#{blob}"
> +             auth_string = Aws::Utils::sign(password, string_to_sign)
> +             @http = Net::HTTP.new("#{bucket}.#{uri.host}", uri.port )
> +             @http.use_ssl = true
> +             @http.verify_mode = OpenSSL::SSL::VERIFY_NONE
> +             @provider_request = Net::HTTP::Put.new("/#{blob}")
> +             @provider_request['Host'] = "#{bucket}.#{uri.host}"
> +             @provider_request['Date'] = timestamp
> +             @provider_request['Content-Type'] = content_type
> +             @provider_request['Content-Length'] = @content_length
> +             @provider_request['Authorization'] = "AWS 
> #{user}:#{auth_string}"
> +             @provider_request['Expect'] = "100-continue"

Can we move some (or all) of this stuff into the drivers ?

> +#            @provider_request.body_stream = @buf
> +#            true is the flag for blob_stream
> +             @content_length = @content_length.to_i
> +             @sock = @http.request(@provider_request, nil, true)
> +  end
> +
> +  def << (data)
> +             @sock.write(data)
> +    @size += data.length
> +             if (@size >= @content_length)
> +                     result = @http.end_request
> +                     if result.inspect =~ (/Net::HTTPOK 200 OK/)

Is there really no better way to check that the request succeeded than
to depend on its string representation ?

> +                             @request.env["BLOB_SUCCESS"] = "true"
> +                     else
> +                             @request.env["BLOB_FAIL"] = result.body
> +                     end
> +             end
> +  end
> +
> +  def rewind
> +    puts "total size counted was #{size}"
> +  end

Is this left over from debugging ?

> +  #use the Request.env hash (populated by the ThinParser) to
> +  #determine whether this is a post blob operation
> +  #by definition, only get here with a body of > 112kbytes - 
> thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32)
> +  #request.env['PATH_INFO']  === "/api/buckets/mynewcoolbucketwoo"
> +  def self.is_put_blob(request = nil)
> +    path = request.env['PATH_INFO']
> +    method = request.env['REQUEST_METHOD']
> +    if ( ((path =~ /^\/api\/buckets/i) == 0) &&
> +         ( method == 'PUT') )

The =~ returns nil when there is no match, which is false in Ruby; you
can write the above condition with fewer parens as

        if path =~ /^\/api\/buckets/ && method == 'PUT'

Also, the URL's in the rest of the API are case-sensitive, so no need
for the /.../i

> +      return true
> +    else
> +      return false
> +    end
> +  end
> +
> +  private
> +
> +  def parse_args(request_string, element)
> +    case element
> +      when :bucket
> +                             array = request_string.split("/")
> +                             blob = array.pop
> +                             bucket = array.pop
> +        return bucket, blob
> +      when :credentials
> +        decoded = Base64.decode64(request_string.split('Basic ').last)
> +        key = decoded.split(':').first
> +        pass = decoded.split(':').last
> +        return key, pass
> +      else
> +        nil
> +    end
> +  end

Why not make this two methods parse_bucket and parse_credentials, since
the two branches have nothing in common.

> +  def new_provider(driver_name)
> +    provider = case driver_name
> +      when (/EC2/) then "https://s3.amazonaws.com";
> +      when (/Rackspace/) then "Rax"
> +      when (/Azure/) then "Azure"
> +      else
> +        "UNKNOWN" # should blow something up here - noisily
> +    end
> +    provider
> +  end

Can't we just grab the driver/instantiate a new one and see whether it
supports streaming puts of a blob ? Going by the driver name seems
pretty fragile.

> diff --git a/server/server.rb b/server/server.rb
> index 86dd524..eef4218 100644
> --- a/server/server.rb
> +++ b/server/server.rb
> @@ -696,10 +696,32 @@ get '/api/buckets/:bucket/new_blob' do
>    end
>  end
>  
> -#create a new blob
> +#create a new blob using PUT - streams through deltacloud
> +put '/api/buckets/:bucket/:blob' do
> +     if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
> +             content_type = env["CONTENT_TYPE"]
> +             content_type ||=  ""
> +             @blob = Blob.new({:id => params[:blob],
> +                                                                             
>         :bucket => params[:bucket],
> +                                                                             
>         :content_length => env["CONTENT_LENGTH"],
> +                                                                             
>         :content_type => content_type,
> +                                                                             
>         :last_modified => '',
> +                                                                             
>         :user_metadata => []}) #add metadata
> +#debugger
> +#      respond_to do |format|
> +#            format.xml { haml :"blobs/show" }
> +#    end

Debugging leftovers ? ;)

> +             @_format = :xml
> +             haml :"blobs/show"
> +     else
> +             report_error(500) #OK?
> +     end
> +end

Why isn't there a respond_to block here ?

> +#create a new blob using html interface - NON STREAMING (i.e. browser POST 
> http form data)
>  post '/api/buckets/:bucket' do
>    bucket_id = params[:bucket]
> -  blob_id = params['blob_id']
> +  blob_id = params['blob']
>    blob_data = params['blob_data']
>    user_meta = {}
>  #first try get blob_metadata from params (i.e., passed by http form post, 
> e.g. browser)
> @@ -710,11 +732,13 @@ post '/api/buckets/:bucket' do
>        key = "HTTP_X_Deltacloud_Blobmeta_#{key}"
>        value = params[:"meta_value#{i}"]
>        user_meta[key] = value
> -    end #max.each do
> -  else #can try to get blob_metadata from http headers
> -    meta_array = request.env.select{|k,v| 
> k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> -    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = 
> array.last}
> -  end #end if
> +             end
> +  end #max.each do
> +#  else #can try to get blob_metadata from http headers - NO LONGER VALID 
> (Have seperate PUT for this type of metadata headers)
> +#    meta_array = request.env.select{|k,v| 
> k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> +#    meta_array.inject({}){ |result, array| user_meta[array.first.upcase] = 
> array.last}
> +#  end #end if
> +#  BlobPostStream.call(credentials, bucket_id, blob_id, env, user_meta)
>    @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data, 
> user_meta)
>    respond_to do |format|
>      format.html { haml :"blobs/show"}

Why all these changes to the HTML UI ? It doesn't seem that streaming
blobs has any connection with this.

David


Reply via email to