Hi Marios,
Good stuff, though I have comments ;)
First off, some stylistic remarks:
* make sure the first line of the commit message stays below 80
characters
* the same for some of the code; try to keep lines to 80 chars or
less.
* please don't use tabs for indentation, since they'll look
different for everybody. (I also cut whitespace at the end of
lines; I am more than happy to share my Emacs setup that does
all that automatically)
* I generally dislike comments like 'end # each_key do' - if they
are really necessary, it's a sign that the code probably needs
to be split up
On Fri, 2011-05-20 at 19:09 +0300, [email protected] wrote:
> From: marios <[email protected]>
>
>
> Signed-off-by: marios <[email protected]>
> ---
> server/lib/deltacloud/drivers/ec2/ec2_driver.rb | 4 +-
> server/lib/deltacloud/helpers/blob_stream.rb | 168
> +++++++++++++++++++++++
> server/server.rb | 40 +++++-
> server/views/blobs/new.html.haml | 14 +-
> 4 files changed, 210 insertions(+), 16 deletions(-)
>
> diff --git a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> index 14c5829..a31d358 100644
> --- a/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> +++ b/server/lib/deltacloud/drivers/ec2/ec2_driver.rb
> @@ -1,3 +1,5 @@
> +# Copyright (C) 2009, 2010 Red Hat, Inc.
> +#
This needs to be removed - it'll just lead to tears for the next
release.
> # Licensed to the Apache Software Foundation (ASF) under one or more
> # contributor license agreements. See the NOTICE file distributed with
> # this work for additional information regarding copyright ownership. The
> @@ -396,7 +398,7 @@ module Deltacloud
> end
>
> #--
> - # Create Blob
> + # Create Blob - NON Streaming way (i.e. was called with POST html
> multipart form data)
> #--
> def create_blob(credentials, bucket_id, blob_id, data = nil, opts =
> {})
> s3_client = new_client(credentials, :s3)
> diff --git a/server/lib/deltacloud/helpers/blob_stream.rb
> b/server/lib/deltacloud/helpers/blob_stream.rb
> index fce14d0..a96b9ca 100644
> --- a/server/lib/deltacloud/helpers/blob_stream.rb
> +++ b/server/lib/deltacloud/helpers/blob_stream.rb
> @@ -79,3 +79,171 @@ class Hash
> end #def
>
> end #class
> +
> +#Monkey patch for streaming blobs:
> +# Normally a client will upload a blob to deltacloud and thin will put this
> into a tempfile.
> +# Then deltacloud would stream up to the provider: client =-->>TEMP_FILE-->>
> deltacloud =-->>STREAM-->> provider
> +# Instead we want to recognise that this is a 'Post blob' operation and
> start streaming to the provider as
> +# the request is received: client =-->>STREAM-->> deltacloud =-->>STREAM-->>
> provider
Good that you added the comment - it should be formatted a little better
though to be more readable.
> +module Thin
> + class Request
> +
> + alias_method :move_body_to_tempfile_orig, :move_body_to_tempfile
> + private
> + def move_body_to_tempfile
> + if BlobStreamIO::is_put_blob(self)
> + @body = BlobStreamIO.new(self)
> + else
> + move_body_to_tempfile_orig
> + end
> + end
> +
> + end
> +end
> +
> +require 'net/http'
> +#monkey patch for Net:HTTP
> +module Net
> + class HTTP
> +
> + alias :request_orig :request
> +
> + @blob_req = nil # needs global scope for close op later
I don't think that does what you need it to do, since this code is
evaluated once, when the class is loaded. But since @blob_req is an
instance variable that is used nowhere else, it will always be nil on
newly created Net::HTTP instances.
Strictly speaking, there's no need to clear out @blob_req; but you could
do that in end_request, after calling end_transport. That is still
vulnerable to requests being interrupted before they reach that point.
> + def request(req, body = nil, blob_stream = nil, &block)
> + unless blob_stream
> + return request_orig(req, body, &block)
> + end
> + @blob_req = req
> + do_start #start the connection
> +
> + req.set_body_internal body
> + begin_transport req
> + req.write_header_m @socket,@curr_http_version, edit_path(req.path)
> + @socket
> + end
> +
> + class Put < HTTPRequest
> + def write_header_m(sock, ver, path)
> + write_header(sock, ver, path)
> + end
> + end
> +
> + def end_request
> + begin
> + res = HTTPResponse.read_new(@socket)
> + end while res.kind_of?(HTTPContinue)
> + res.reading_body(@socket, @blob_req.response_body_permitted?) {
> + yield res if block_given?
> + }
> + end_transport @blob_req, res
> + do_finish
Set @blob_req = nil here
> + res
> + end
> + end
> +
> +end
> +
> +
> +
> +
> +require 'aws'
> +require 'uri'
> +class BlobStreamIO
> +
> + attr_accessor :size, :provider, :sock
> +
> + def initialize(request)
> + @request = request
> + @size = 0
> + provider = new_provider(request.driver.to_s)
> + bucket, blob = parse_args(request.env["PATH_INFO"], :bucket)
> + user, password = parse_args(request.env['HTTP_AUTHORIZATION'],
> :credentials)
> + content_type = request.env['CONTENT_TYPE']
> + content_type ||= ""
Simpler: content_type = request.env['CONTENT_TYPE'] || ""
> + @content_length = request.env['CONTENT_LENGTH']
> + #user_meta = {}
> + #meta_array = request.env.select{|k,v|
> k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> + #meta_array.inject({}){ |result, array|
> user_meta[array.first.upcase] = array.last}
Remove this if it isn't needed anymore
> + uri = URI.parse(provider)
> + timestamp = Time.now.httpdate
> + string_to_sign =
> "PUT\n\n#{content_type}\n#{timestamp}\n/#{bucket}/#{blob}"
> + auth_string = Aws::Utils::sign(password, string_to_sign)
> + @http = Net::HTTP.new("#{bucket}.#{uri.host}", uri.port )
> + @http.use_ssl = true
> + @http.verify_mode = OpenSSL::SSL::VERIFY_NONE
> + @provider_request = Net::HTTP::Put.new("/#{blob}")
> + @provider_request['Host'] = "#{bucket}.#{uri.host}"
> + @provider_request['Date'] = timestamp
> + @provider_request['Content-Type'] = content_type
> + @provider_request['Content-Length'] = @content_length
> + @provider_request['Authorization'] = "AWS
> #{user}:#{auth_string}"
> + @provider_request['Expect'] = "100-continue"
Can we move some (or all) of this stuff into the drivers ?
> +# @provider_request.body_stream = @buf
> +# true is the flag for blob_stream
> + @content_length = @content_length.to_i
> + @sock = @http.request(@provider_request, nil, true)
> + end
> +
> + def << (data)
> + @sock.write(data)
> + @size += data.length
> + if (@size >= @content_length)
> + result = @http.end_request
> + if result.inspect =~ (/Net::HTTPOK 200 OK/)
Is there really no better way to check that the request succeeded than
to depend on its string representation ?
> + @request.env["BLOB_SUCCESS"] = "true"
> + else
> + @request.env["BLOB_FAIL"] = result.body
> + end
> + end
> + end
> +
> + def rewind
> + puts "total size counted was #{size}"
> + end
Is this left over from debugging ?
> + #use the Request.env hash (populated by the ThinParser) to
> + #determine whether this is a post blob operation
> + #by definition, only get here with a body of > 112kbytes -
> thin/lib/thin/request.rb:12 MAX_BODY = 1024 * (80 + 32)
> + #request.env['PATH_INFO'] === "/api/buckets/mynewcoolbucketwoo"
> + def self.is_put_blob(request = nil)
> + path = request.env['PATH_INFO']
> + method = request.env['REQUEST_METHOD']
> + if ( ((path =~ /^\/api\/buckets/i) == 0) &&
> + ( method == 'PUT') )
The =~ returns nil when there is no match, which is false in Ruby; you
can write the above condition with fewer parens as
if path =~ /^\/api\/buckets/ && method == 'PUT'
Also, the URL's in the rest of the API are case-sensitive, so no need
for the /.../i
> + return true
> + else
> + return false
> + end
> + end
> +
> + private
> +
> + def parse_args(request_string, element)
> + case element
> + when :bucket
> + array = request_string.split("/")
> + blob = array.pop
> + bucket = array.pop
> + return bucket, blob
> + when :credentials
> + decoded = Base64.decode64(request_string.split('Basic ').last)
> + key = decoded.split(':').first
> + pass = decoded.split(':').last
> + return key, pass
> + else
> + nil
> + end
> + end
Why not make this two methods parse_bucket and parse_credentials, since
the two branches have nothing in common.
> + def new_provider(driver_name)
> + provider = case driver_name
> + when (/EC2/) then "https://s3.amazonaws.com"
> + when (/Rackspace/) then "Rax"
> + when (/Azure/) then "Azure"
> + else
> + "UNKNOWN" # should blow something up here - noisily
> + end
> + provider
> + end
Can't we just grab the driver/instantiate a new one and see whether it
supports streaming puts of a blob ? Going by the driver name seems
pretty fragile.
> diff --git a/server/server.rb b/server/server.rb
> index 86dd524..eef4218 100644
> --- a/server/server.rb
> +++ b/server/server.rb
> @@ -696,10 +696,32 @@ get '/api/buckets/:bucket/new_blob' do
> end
> end
>
> -#create a new blob
> +#create a new blob using PUT - streams through deltacloud
> +put '/api/buckets/:bucket/:blob' do
> + if(env["BLOB_SUCCESS"]) #ie got a 200ok after putting blob
> + content_type = env["CONTENT_TYPE"]
> + content_type ||= ""
> + @blob = Blob.new({:id => params[:blob],
> +
> :bucket => params[:bucket],
> +
> :content_length => env["CONTENT_LENGTH"],
> +
> :content_type => content_type,
> +
> :last_modified => '',
> +
> :user_metadata => []}) #add metadata
> +#debugger
> +# respond_to do |format|
> +# format.xml { haml :"blobs/show" }
> +# end
Debugging leftovers ? ;)
> + @_format = :xml
> + haml :"blobs/show"
> + else
> + report_error(500) #OK?
> + end
> +end
Why isn't there a respond_to block here ?
> +#create a new blob using html interface - NON STREAMING (i.e. browser POST
> http form data)
> post '/api/buckets/:bucket' do
> bucket_id = params[:bucket]
> - blob_id = params['blob_id']
> + blob_id = params['blob']
> blob_data = params['blob_data']
> user_meta = {}
> #first try get blob_metadata from params (i.e., passed by http form post,
> e.g. browser)
> @@ -710,11 +732,13 @@ post '/api/buckets/:bucket' do
> key = "HTTP_X_Deltacloud_Blobmeta_#{key}"
> value = params[:"meta_value#{i}"]
> user_meta[key] = value
> - end #max.each do
> - else #can try to get blob_metadata from http headers
> - meta_array = request.env.select{|k,v|
> k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> - meta_array.inject({}){ |result, array| user_meta[array.first.upcase] =
> array.last}
> - end #end if
> + end
> + end #max.each do
> +# else #can try to get blob_metadata from http headers - NO LONGER VALID
> (Have seperate PUT for this type of metadata headers)
> +# meta_array = request.env.select{|k,v|
> k.match(/^HTTP[-_]X[-_]Deltacloud[-_]Blobmeta[-_]/i)}
> +# meta_array.inject({}){ |result, array| user_meta[array.first.upcase] =
> array.last}
> +# end #end if
> +# BlobPostStream.call(credentials, bucket_id, blob_id, env, user_meta)
> @blob = driver.create_blob(credentials, bucket_id, blob_id, blob_data,
> user_meta)
> respond_to do |format|
> format.html { haml :"blobs/show"}
Why all these changes to the HTML UI ? It doesn't seem that streaming
blobs has any connection with this.
David