http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/debug/trace_event_impl.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/debug/trace_event_impl.h b/be/src/kudu/util/debug/trace_event_impl.h new file mode 100644 index 0000000..e5e85df --- /dev/null +++ b/be/src/kudu/util/debug/trace_event_impl.h @@ -0,0 +1,718 @@ +// Copyright (c) 2012 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#ifndef KUDU_UTIL_DEBUG_TRACE_EVENT_IMPL_H_ +#define KUDU_UTIL_DEBUG_TRACE_EVENT_IMPL_H_ + +#include <gtest/gtest_prod.h> +#include <stack> +#include <sstream> +#include <string> +#include <unordered_map> +#include <unordered_set> +#include <vector> + + +#include "kudu/gutil/atomicops.h" +#include "kudu/gutil/callback.h" +#include "kudu/gutil/walltime.h" +#include "kudu/gutil/ref_counted.h" +#include "kudu/gutil/ref_counted_memory.h" +#include "kudu/util/atomic.h" +#include "kudu/util/condition_variable.h" +#include "kudu/util/locks.h" +#include "kudu/util/thread.h" +#include "kudu/util/threadlocal.h" + +// Older style trace macros with explicit id and extra data +// Only these macros result in publishing data to ETW as currently implemented. +#define TRACE_EVENT_BEGIN_ETW(name, id, extra) \ + base::debug::TraceLog::AddTraceEventEtw( \ + TRACE_EVENT_PHASE_BEGIN, \ + name, reinterpret_cast<const void*>(id), extra) + +#define TRACE_EVENT_END_ETW(name, id, extra) \ + base::debug::TraceLog::AddTraceEventEtw( \ + TRACE_EVENT_PHASE_END, \ + name, reinterpret_cast<const void*>(id), extra) + +#define TRACE_EVENT_INSTANT_ETW(name, id, extra) \ + base::debug::TraceLog::AddTraceEventEtw( \ + TRACE_EVENT_PHASE_INSTANT, \ + name, reinterpret_cast<const void*>(id), extra) + +template <typename Type> +class Singleton; + +#if defined(COMPILER_GCC) +namespace BASE_HASH_NAMESPACE { +template <> +struct hash<kudu::Thread*> { + std::size_t operator()(kudu::Thread* value) const { + return reinterpret_cast<std::size_t>(value); + } +}; +} // BASE_HASH_NAMESPACE +#endif + +namespace kudu { +namespace debug { + +// For any argument of type TRACE_VALUE_TYPE_CONVERTABLE the provided +// class must implement this interface. +class ConvertableToTraceFormat : public kudu::RefCountedThreadSafe<ConvertableToTraceFormat> { + public: + // Append the class info to the provided |out| string. The appended + // data must be a valid JSON object. Strings must be properly quoted, and + // escaped. There is no processing applied to the content after it is + // appended. + virtual void AppendAsTraceFormat(std::string* out) const = 0; + + protected: + virtual ~ConvertableToTraceFormat() {} + + private: + friend class kudu::RefCountedThreadSafe<ConvertableToTraceFormat>; +}; + +struct TraceEventHandle { + uint32 chunk_seq; + uint16 chunk_index; + uint16 event_index; +}; + +const int kTraceMaxNumArgs = 2; + +class BASE_EXPORT TraceEvent { + public: + union TraceValue { + bool as_bool; + uint64_t as_uint; + long long as_int; + double as_double; + const void* as_pointer; + const char* as_string; + }; + + TraceEvent(); + ~TraceEvent(); + + // We don't need to copy TraceEvent except when TraceEventBuffer is cloned. + // Use explicit copy method to avoid accidentally misuse of copy. + void CopyFrom(const TraceEvent& other); + + void Initialize( + int thread_id, + MicrosecondsInt64 timestamp, + MicrosecondsInt64 thread_timestamp, + char phase, + const unsigned char* category_group_enabled, + const char* name, + uint64_t id, + int num_args, + const char** arg_names, + const unsigned char* arg_types, + const uint64_t* arg_values, + const scoped_refptr<ConvertableToTraceFormat>* convertable_values, + unsigned char flags); + + void Reset(); + + void UpdateDuration(const MicrosecondsInt64& now, const MicrosecondsInt64& thread_now); + + // Serialize event data to JSON + void AppendAsJSON(std::string* out) const; + void AppendPrettyPrinted(std::ostringstream* out) const; + + static void AppendValueAsJSON(unsigned char type, + TraceValue value, + std::string* out); + + MicrosecondsInt64 timestamp() const { return timestamp_; } + MicrosecondsInt64 thread_timestamp() const { return thread_timestamp_; } + char phase() const { return phase_; } + int thread_id() const { return thread_id_; } + MicrosecondsInt64 duration() const { return duration_; } + MicrosecondsInt64 thread_duration() const { return thread_duration_; } + uint64_t id() const { return id_; } + unsigned char flags() const { return flags_; } + + // Exposed for unittesting: + + const kudu::RefCountedString* parameter_copy_storage() const { + return parameter_copy_storage_.get(); + } + + const unsigned char* category_group_enabled() const { + return category_group_enabled_; + } + + const char* name() const { return name_; } + +#if defined(OS_ANDROID) + void SendToATrace(); +#endif + + private: + // Note: these are ordered by size (largest first) for optimal packing. + MicrosecondsInt64 timestamp_; + MicrosecondsInt64 thread_timestamp_; + MicrosecondsInt64 duration_; + MicrosecondsInt64 thread_duration_; + // id_ can be used to store phase-specific data. + uint64_t id_; + TraceValue arg_values_[kTraceMaxNumArgs]; + const char* arg_names_[kTraceMaxNumArgs]; + scoped_refptr<ConvertableToTraceFormat> convertable_values_[kTraceMaxNumArgs]; + const unsigned char* category_group_enabled_; + const char* name_; + scoped_refptr<kudu::RefCountedString> parameter_copy_storage_; + int thread_id_; + char phase_; + unsigned char flags_; + unsigned char arg_types_[kTraceMaxNumArgs]; + + DISALLOW_COPY_AND_ASSIGN(TraceEvent); +}; + +// TraceBufferChunk is the basic unit of TraceBuffer. +class BASE_EXPORT TraceBufferChunk { + public: + TraceBufferChunk(uint32 seq) + : next_free_(0), + seq_(seq) { + } + + void Reset(uint32 new_seq); + TraceEvent* AddTraceEvent(size_t* event_index); + bool IsFull() const { return next_free_ == kTraceBufferChunkSize; } + + uint32 seq() const { return seq_; } + size_t capacity() const { return kTraceBufferChunkSize; } + size_t size() const { return next_free_; } + + TraceEvent* GetEventAt(size_t index) { + DCHECK(index < size()); + return &chunk_[index]; + } + const TraceEvent* GetEventAt(size_t index) const { + DCHECK(index < size()); + return &chunk_[index]; + } + + gscoped_ptr<TraceBufferChunk> Clone() const; + + static const size_t kTraceBufferChunkSize = 64; + + private: + size_t next_free_; + TraceEvent chunk_[kTraceBufferChunkSize]; + uint32 seq_; +}; + +// TraceBuffer holds the events as they are collected. +class BASE_EXPORT TraceBuffer { + public: + virtual ~TraceBuffer() {} + + virtual gscoped_ptr<TraceBufferChunk> GetChunk(size_t *index) = 0; + virtual void ReturnChunk(size_t index, + gscoped_ptr<TraceBufferChunk> chunk) = 0; + + virtual bool IsFull() const = 0; + virtual size_t Size() const = 0; + virtual size_t Capacity() const = 0; + virtual TraceEvent* GetEventByHandle(TraceEventHandle handle) = 0; + + // For iteration. Each TraceBuffer can only be iterated once. + virtual const TraceBufferChunk* NextChunk() = 0; + + virtual gscoped_ptr<TraceBuffer> CloneForIteration() const = 0; +}; + +// TraceResultBuffer collects and converts trace fragments returned by TraceLog +// to JSON output. +class TraceResultBuffer { + public: + static std::string FlushTraceLogToString(); + static std::string FlushTraceLogToStringButLeaveBufferIntact(); + + private: + TraceResultBuffer(); + ~TraceResultBuffer(); + + static std::string DoFlush(bool leave_intact); + + // Callback for TraceLog::Flush + void Collect(const scoped_refptr<RefCountedString>& s, + bool has_more_events); + + bool first_; + std::string json_; +}; + +class BASE_EXPORT CategoryFilter { + public: + typedef std::vector<std::string> StringList; + + // The default category filter, used when none is provided. + // Allows all categories through, except if they end in the suffix 'Debug' or + // 'Test'. + static const char* kDefaultCategoryFilterString; + + // |filter_string| is a comma-delimited list of category wildcards. + // A category can have an optional '-' prefix to make it an excluded category. + // All the same rules apply above, so for example, having both included and + // excluded categories in the same list would not be supported. + // + // Example: CategoryFilter"test_MyTest*"); + // Example: CategoryFilter("test_MyTest*,test_OtherStuff"); + // Example: CategoryFilter("-excluded_category1,-excluded_category2"); + // Example: CategoryFilter("-*,webkit"); would disable everything but webkit. + // Example: CategoryFilter("-webkit"); would enable everything but webkit. + // + // Category filters can also be used to configure synthetic delays. + // + // Example: CategoryFilter("DELAY(gpu.PresentingFrame;16)"); would make swap + // buffers always take at least 16 ms. + // Example: CategoryFilter("DELAY(gpu.PresentingFrame;16;oneshot)"); would + // make swap buffers take at least 16 ms the first time it is + // called. + // Example: CategoryFilter("DELAY(gpu.PresentingFrame;16;alternating)"); + // would make swap buffers take at least 16 ms every other time it + // is called. + explicit CategoryFilter(const std::string& filter_string); + + CategoryFilter(const CategoryFilter& cf); + + ~CategoryFilter(); + + CategoryFilter& operator=(const CategoryFilter& rhs); + + // Writes the string representation of the CategoryFilter. This is a comma + // separated string, similar in nature to the one used to determine + // enabled/disabled category patterns, except here there is an arbitrary + // order, included categories go first, then excluded categories. Excluded + // categories are distinguished from included categories by the prefix '-'. + std::string ToString() const; + + // Determines whether category group would be enabled or + // disabled by this category filter. + bool IsCategoryGroupEnabled(const char* category_group) const; + + // Return a list of the synthetic delays specified in this category filter. + const StringList& GetSyntheticDelayValues() const; + + // Merges nested_filter with the current CategoryFilter + void Merge(const CategoryFilter& nested_filter); + + // Clears both included/excluded pattern lists. This would be equivalent to + // creating a CategoryFilter with an empty string, through the constructor. + // i.e: CategoryFilter(""). + // + // When using an empty filter, all categories are considered included as we + // are not excluding anything. + void Clear(); + + private: + FRIEND_TEST(TraceEventTestFixture, CategoryFilter); + + static bool IsEmptyOrContainsLeadingOrTrailingWhitespace( + const std::string& str); + + void Initialize(const std::string& filter_string); + void WriteString(const StringList& values, + std::string* out, + bool included) const; + void WriteString(const StringList& delays, std::string* out) const; + bool HasIncludedPatterns() const; + + bool DoesCategoryGroupContainCategory(const char* category_group, + const char* category) const; + + StringList included_; + StringList disabled_; + StringList excluded_; + StringList delays_; +}; + +class TraceSamplingThread; + +class BASE_EXPORT TraceLog { + public: + enum Mode { + DISABLED = 0, + RECORDING_MODE, + MONITORING_MODE, + }; + + // Options determines how the trace buffer stores data. + enum Options { + // Record until the trace buffer is full. + RECORD_UNTIL_FULL = 1 << 0, + + // Record until the user ends the trace. The trace buffer is a fixed size + // and we use it as a ring buffer during recording. + RECORD_CONTINUOUSLY = 1 << 1, + + // Enable the sampling profiler in the recording mode. + ENABLE_SAMPLING = 1 << 2, + + // Echo to console. Events are discarded. + ECHO_TO_CONSOLE = 1 << 3, + }; + + // The pointer returned from GetCategoryGroupEnabledInternal() points to a + // value with zero or more of the following bits. Used in this class only. + // The TRACE_EVENT macros should only use the value as a bool. + // These values must be in sync with macro values in TraceEvent.h in Blink. + enum CategoryGroupEnabledFlags { + // Category group enabled for the recording mode. + ENABLED_FOR_RECORDING = 1 << 0, + // Category group enabled for the monitoring mode. + ENABLED_FOR_MONITORING = 1 << 1, + // Category group enabled by SetEventCallbackEnabled(). + ENABLED_FOR_EVENT_CALLBACK = 1 << 2, + }; + + static TraceLog* GetInstance(); + + // Get set of known category groups. This can change as new code paths are + // reached. The known category groups are inserted into |category_groups|. + void GetKnownCategoryGroups(std::vector<std::string>* category_groups); + + // Retrieves a copy (for thread-safety) of the current CategoryFilter. + CategoryFilter GetCurrentCategoryFilter(); + + Options trace_options() const { + return static_cast<Options>(base::subtle::NoBarrier_Load(&trace_options_)); + } + + // Enables normal tracing (recording trace events in the trace buffer). + // See CategoryFilter comments for details on how to control what categories + // will be traced. If tracing has already been enabled, |category_filter| will + // be merged into the current category filter. + void SetEnabled(const CategoryFilter& category_filter, + Mode mode, Options options); + + // Disables normal tracing for all categories. + void SetDisabled(); + + bool IsEnabled() { return mode_ != DISABLED; } + + // The number of times we have begun recording traces. If tracing is off, + // returns -1. If tracing is on, then it returns the number of times we have + // recorded a trace. By watching for this number to increment, you can + // passively discover when a new trace has begun. This is then used to + // implement the TRACE_EVENT_IS_NEW_TRACE() primitive. + int GetNumTracesRecorded(); + +#if defined(OS_ANDROID) + void StartATrace(); + void StopATrace(); + void AddClockSyncMetadataEvent(); +#endif + + // Enabled state listeners give a callback when tracing is enabled or + // disabled. This can be used to tie into other library's tracing systems + // on-demand. + class EnabledStateObserver { + public: + // Called just after the tracing system becomes enabled, outside of the + // |lock_|. TraceLog::IsEnabled() is true at this point. + virtual void OnTraceLogEnabled() = 0; + + // Called just after the tracing system disables, outside of the |lock_|. + // TraceLog::IsEnabled() is false at this point. + virtual void OnTraceLogDisabled() = 0; + }; + void AddEnabledStateObserver(EnabledStateObserver* listener); + void RemoveEnabledStateObserver(EnabledStateObserver* listener); + bool HasEnabledStateObserver(EnabledStateObserver* listener) const; + + float GetBufferPercentFull() const; + bool BufferIsFull() const; + + // Not using kudu::Callback because of its limited by 7 parameters. + // Also, using primitive type allows directly passing callback from WebCore. + // WARNING: It is possible for the previously set callback to be called + // after a call to SetEventCallbackEnabled() that replaces or a call to + // SetEventCallbackDisabled() that disables the callback. + // This callback may be invoked on any thread. + // For TRACE_EVENT_PHASE_COMPLETE events, the client will still receive pairs + // of TRACE_EVENT_PHASE_BEGIN and TRACE_EVENT_PHASE_END events to keep the + // interface simple. + typedef void (*EventCallback)(MicrosecondsInt64 timestamp, + char phase, + const unsigned char* category_group_enabled, + const char* name, + uint64_t id, + int num_args, + const char* const arg_names[], + const unsigned char arg_types[], + const uint64_t arg_values[], + unsigned char flags); + + // Enable tracing for EventCallback. + void SetEventCallbackEnabled(const CategoryFilter& category_filter, + EventCallback cb); + void SetEventCallbackDisabled(); + + // Flush all collected events to the given output callback. The callback will + // be called one or more times synchronously from + // the current thread with IPC-bite-size chunks. The string format is + // undefined. Use TraceResultBuffer to convert one or more trace strings to + // JSON. The callback can be null if the caller doesn't want any data. + // Due to the implementation of thread-local buffers, flush can't be + // done when tracing is enabled. If called when tracing is enabled, the + // callback will be called directly with (empty_string, false) to indicate + // the end of this unsuccessful flush. + typedef kudu::Callback<void(const scoped_refptr<kudu::RefCountedString>&, + bool has_more_events)> OutputCallback; + void Flush(const OutputCallback& cb); + void FlushButLeaveBufferIntact(const OutputCallback& flush_output_callback); + + // Called by TRACE_EVENT* macros, don't call this directly. + // The name parameter is a category group for example: + // TRACE_EVENT0("renderer,webkit", "WebViewImpl::HandleInputEvent") + static const unsigned char* GetCategoryGroupEnabled(const char* name); + static const char* GetCategoryGroupName( + const unsigned char* category_group_enabled); + + // Called by TRACE_EVENT* macros, don't call this directly. + // If |copy| is set, |name|, |arg_name1| and |arg_name2| will be deep copied + // into the event; see "Memory scoping note" and TRACE_EVENT_COPY_XXX above. + TraceEventHandle AddTraceEvent( + char phase, + const unsigned char* category_group_enabled, + const char* name, + uint64_t id, + int num_args, + const char** arg_names, + const unsigned char* arg_types, + const uint64_t* arg_values, + const scoped_refptr<ConvertableToTraceFormat>* convertable_values, + unsigned char flags); + TraceEventHandle AddTraceEventWithThreadIdAndTimestamp( + char phase, + const unsigned char* category_group_enabled, + const char* name, + uint64_t id, + int thread_id, + const MicrosecondsInt64& timestamp, + int num_args, + const char** arg_names, + const unsigned char* arg_types, + const uint64_t* arg_values, + const scoped_refptr<ConvertableToTraceFormat>* convertable_values, + unsigned char flags); + static void AddTraceEventEtw(char phase, + const char* category_group, + const void* id, + const char* extra); + static void AddTraceEventEtw(char phase, + const char* category_group, + const void* id, + const std::string& extra); + + void UpdateTraceEventDuration(const unsigned char* category_group_enabled, + const char* name, + TraceEventHandle handle); + + // For every matching event, the callback will be called. + typedef kudu::Callback<void()> WatchEventCallback; + void SetWatchEvent(const std::string& category_name, + const std::string& event_name, + const WatchEventCallback& callback); + // Cancel the watch event. If tracing is enabled, this may race with the + // watch event notification firing. + void CancelWatchEvent(); + + int process_id() const { return process_id_; } + + // Allow tests to inspect TraceEvents. + size_t GetEventsSize() const { return logged_events_->Size(); } + TraceEvent* GetEventByHandle(TraceEventHandle handle); + + void SetProcessID(int process_id); + + // Process sort indices, if set, override the order of a process will appear + // relative to other processes in the trace viewer. Processes are sorted first + // on their sort index, ascending, then by their name, and then tid. + void SetProcessSortIndex(int sort_index); + + // Sets the name of the process. + void SetProcessName(const std::string& process_name); + + // Processes can have labels in addition to their names. Use labels, for + // instance, to list out the web page titles that a process is handling. + void UpdateProcessLabel(int label_id, const std::string& current_label); + void RemoveProcessLabel(int label_id); + + // Thread sort indices, if set, override the order of a thread will appear + // within its process in the trace viewer. Threads are sorted first on their + // sort index, ascending, then by their name, and then tid. + void SetThreadSortIndex(int64_t tid , int sort_index); + + // Allow setting an offset between the current MicrosecondsInt64 time and the time + // that should be reported. + void SetTimeOffset(MicrosecondsInt64 offset); + + size_t GetObserverCountForTest() const; + + + private: + FRIEND_TEST(TraceEventTestFixture, + TraceBufferRingBufferGetReturnChunk); + FRIEND_TEST(TraceEventTestFixture, + TraceBufferRingBufferHalfIteration); + FRIEND_TEST(TraceEventTestFixture, + TraceBufferRingBufferFullIteration); + + // This allows constructor and destructor to be private and usable only + // by the Singleton class. + friend class Singleton<TraceLog>; + + // Enable/disable each category group based on the current mode_, + // category_filter_, event_callback_ and event_callback_category_filter_. + // Enable the category group in the enabled mode if category_filter_ matches + // the category group, or event_callback_ is not null and + // event_callback_category_filter_ matches the category group. + void UpdateCategoryGroupEnabledFlags(); + void UpdateCategoryGroupEnabledFlag(int category_index); + + // Configure synthetic delays based on the values set in the current + // category filter. + void UpdateSyntheticDelaysFromCategoryFilter(); + + struct PerThreadInfo; + class OptionalAutoLock; + class ThreadLocalEventBuffer; + + TraceLog(); + ~TraceLog(); + const unsigned char* GetCategoryGroupEnabledInternal(const char* name); + void AddMetadataEventsWhileLocked(); + + TraceBuffer* trace_buffer() const { return logged_events_.get(); } + TraceBuffer* CreateTraceBuffer(); + + std::string EventToConsoleMessage(unsigned char phase, + const MicrosecondsInt64& timestamp, + TraceEvent* trace_event); + + TraceEvent* AddEventToThreadSharedChunkWhileLocked(TraceEventHandle* handle, + bool check_buffer_is_full); + void CheckIfBufferIsFullWhileLocked(); + void SetDisabledWhileLocked(); + + TraceEvent* GetEventByHandleInternal(TraceEventHandle handle, + OptionalAutoLock* lock); + + void ConvertTraceEventsToTraceFormat(gscoped_ptr<TraceBuffer> logged_events, + const OutputCallback& flush_output_callback); + void FinishFlush(int generation, + const OutputCallback& flush_output_callback); + + // Called when a thread which has registered trace events is about to exit. + void ThreadExiting(); + + int generation() const { + return static_cast<int>(base::subtle::NoBarrier_Load(&generation_)); + } + bool CheckGeneration(int generation) const { + return generation == this->generation(); + } + void UseNextTraceBuffer(); + + MicrosecondsInt64 OffsetNow() const { + return OffsetTimestamp(GetMonoTimeMicros()); + } + MicrosecondsInt64 OffsetTimestamp(const MicrosecondsInt64& timestamp) const { + return timestamp - time_offset_; + } + + // Create a new PerThreadInfo object for the current thread, + // and register it in the active_threads_ list. + PerThreadInfo* SetupThreadLocalBuffer(); + + // This lock protects TraceLog member accesses (except for members protected + // by thread_info_lock_) from arbitrary threads. + mutable base::SpinLock lock_; + // This lock protects accesses to thread_names_, thread_event_start_times_ + // and thread_colors_. + base::SpinLock thread_info_lock_; + int locked_line_; + Mode mode_; + int num_traces_recorded_; + gscoped_ptr<TraceBuffer> logged_events_; + AtomicWord /* EventCallback */ event_callback_; + bool dispatching_to_observer_list_; + std::vector<EnabledStateObserver*> enabled_state_observer_list_; + + std::string process_name_; + std::unordered_map<int, std::string> process_labels_; + int process_sort_index_; + std::unordered_map<int, int> thread_sort_indices_; + std::unordered_map<int, std::string> thread_names_; + + // The following two maps are used only when ECHO_TO_CONSOLE. + std::unordered_map<int, std::stack<MicrosecondsInt64> > thread_event_start_times_; + std::unordered_map<std::string, int> thread_colors_; + + // XORed with TraceID to make it unlikely to collide with other processes. + uint64_t process_id_hash_; + + int process_id_; + + MicrosecondsInt64 time_offset_; + + // Allow tests to wake up when certain events occur. + WatchEventCallback watch_event_callback_; + AtomicWord /* const unsigned char* */ watch_category_; + std::string watch_event_name_; + + AtomicWord /* Options */ trace_options_; + + // Sampling thread handles. + gscoped_ptr<TraceSamplingThread> sampling_thread_; + scoped_refptr<kudu::Thread> sampling_thread_handle_; + + CategoryFilter category_filter_; + CategoryFilter event_callback_category_filter_; + + struct PerThreadInfo { + ThreadLocalEventBuffer* event_buffer_; + base::subtle::Atomic32 is_in_trace_event_; + + // Atomically take the event_buffer_ member, setting it to NULL. + // Returns the old value of the member. + ThreadLocalEventBuffer* AtomicTakeBuffer(); + }; + static __thread PerThreadInfo* thread_local_info_; + + Mutex active_threads_lock_; + // Map of PID -> PerThreadInfo + // Protected by active_threads_lock_. + typedef std::unordered_map<int64_t, PerThreadInfo*> ActiveThreadMap; + ActiveThreadMap active_threads_; + + // For events which can't be added into the thread local buffer, e.g. events + // from threads without a message loop. + gscoped_ptr<TraceBufferChunk> thread_shared_chunk_; + size_t thread_shared_chunk_index_; + + // The generation is incremented whenever tracing is enabled, and incremented + // again when the buffers are flushed. This ensures that trace events logged + // for a previous tracing session do not get accidentally flushed in the + // next tracing session. + AtomicWord generation_; + + DISALLOW_COPY_AND_ASSIGN(TraceLog); +}; + +} // namespace debug +} // namespace kudu + +#endif // KUDU_UTIL_DEBUG_TRACE_EVENT_IMPL_H_
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/debug/trace_event_impl_constants.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/debug/trace_event_impl_constants.cc b/be/src/kudu/util/debug/trace_event_impl_constants.cc new file mode 100644 index 0000000..bf45ed7 --- /dev/null +++ b/be/src/kudu/util/debug/trace_event_impl_constants.cc @@ -0,0 +1,14 @@ +// Copyright (c) 2013 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "kudu/util/debug/trace_event_impl.h" + +namespace kudu { +namespace debug { + +// Enable everything but debug and test categories by default. +const char* CategoryFilter::kDefaultCategoryFilterString = "-*Debug,-*Test"; + +} // namespace debug +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/debug/trace_event_memory.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/debug/trace_event_memory.h b/be/src/kudu/util/debug/trace_event_memory.h new file mode 100644 index 0000000..6d9cf8d --- /dev/null +++ b/be/src/kudu/util/debug/trace_event_memory.h @@ -0,0 +1,28 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef KUDU_DEBUG_TRACE_EVENT_MEMORY_H +#define KUDU_DEBUG_TRACE_EVENT_MEMORY_H + +// Stub for this part of chromium tracing we haven't yet +// imported. +// The Chromium code relies on a locally patch tcmalloc. +// See 5bc71bae28ea03689dbf50fe6baa15b574319091 in the Chromium +// repository. + +#define INTERNAL_TRACE_MEMORY(category_group, name) + +#endif /* KUDU_DEBUG_TRACE_EVENT_MEMORY_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/debug/trace_event_synthetic_delay.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/debug/trace_event_synthetic_delay.cc b/be/src/kudu/util/debug/trace_event_synthetic_delay.cc new file mode 100644 index 0000000..0fff9fb --- /dev/null +++ b/be/src/kudu/util/debug/trace_event_synthetic_delay.cc @@ -0,0 +1,230 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +#include "kudu/gutil/singleton.h" +#include "kudu/util/debug/trace_event_synthetic_delay.h" + +namespace { +const int kMaxSyntheticDelays = 32; +} // namespace + +namespace kudu { +namespace debug { + +TraceEventSyntheticDelayClock::TraceEventSyntheticDelayClock() {} +TraceEventSyntheticDelayClock::~TraceEventSyntheticDelayClock() {} + +class TraceEventSyntheticDelayRegistry : public TraceEventSyntheticDelayClock { + public: + static TraceEventSyntheticDelayRegistry* GetInstance(); + + TraceEventSyntheticDelay* GetOrCreateDelay(const char* name); + void ResetAllDelays(); + + // TraceEventSyntheticDelayClock implementation. + virtual MonoTime Now() OVERRIDE; + + private: + TraceEventSyntheticDelayRegistry(); + + friend class Singleton<TraceEventSyntheticDelayRegistry>; + + Mutex lock_; + TraceEventSyntheticDelay delays_[kMaxSyntheticDelays]; + TraceEventSyntheticDelay dummy_delay_; + base::subtle::Atomic32 delay_count_; + + DISALLOW_COPY_AND_ASSIGN(TraceEventSyntheticDelayRegistry); +}; + +TraceEventSyntheticDelay::TraceEventSyntheticDelay() + : mode_(STATIC), begin_count_(0), trigger_count_(0), clock_(nullptr) {} + +TraceEventSyntheticDelay::~TraceEventSyntheticDelay() {} + +TraceEventSyntheticDelay* TraceEventSyntheticDelay::Lookup( + const std::string& name) { + return TraceEventSyntheticDelayRegistry::GetInstance()->GetOrCreateDelay( + name.c_str()); +} + +void TraceEventSyntheticDelay::Initialize( + const std::string& name, + TraceEventSyntheticDelayClock* clock) { + name_ = name; + clock_ = clock; +} + +void TraceEventSyntheticDelay::SetTargetDuration(const MonoDelta& target_duration) { + MutexLock lock(lock_); + target_duration_ = target_duration; + trigger_count_ = 0; + begin_count_ = 0; +} + +void TraceEventSyntheticDelay::SetMode(Mode mode) { + MutexLock lock(lock_); + mode_ = mode; +} + +void TraceEventSyntheticDelay::SetClock(TraceEventSyntheticDelayClock* clock) { + MutexLock lock(lock_); + clock_ = clock; +} + +void TraceEventSyntheticDelay::Begin() { + // Note that we check for a non-zero target duration without locking to keep + // things quick for the common case when delays are disabled. Since the delay + // calculation is done with a lock held, it will always be correct. The only + // downside of this is that we may fail to apply some delays when the target + // duration changes. + ANNOTATE_BENIGN_RACE(&target_duration_, "Synthetic delay duration"); + if (!target_duration_.Initialized()) + return; + + MonoTime start_time = clock_->Now(); + { + MutexLock lock(lock_); + if (++begin_count_ != 1) + return; + end_time_ = CalculateEndTimeLocked(start_time); + } +} + +void TraceEventSyntheticDelay::BeginParallel(MonoTime* out_end_time) { + // See note in Begin(). + ANNOTATE_BENIGN_RACE(&target_duration_, "Synthetic delay duration"); + if (!target_duration_.Initialized()) { + *out_end_time = MonoTime(); + return; + } + + MonoTime start_time = clock_->Now(); + { + MutexLock lock(lock_); + *out_end_time = CalculateEndTimeLocked(start_time); + } +} + +void TraceEventSyntheticDelay::End() { + // See note in Begin(). + ANNOTATE_BENIGN_RACE(&target_duration_, "Synthetic delay duration"); + if (!target_duration_.Initialized()) + return; + + MonoTime end_time; + { + MutexLock lock(lock_); + if (!begin_count_ || --begin_count_ != 0) + return; + end_time = end_time_; + } + if (end_time.Initialized()) + ApplyDelay(end_time); +} + +void TraceEventSyntheticDelay::EndParallel(const MonoTime& end_time) { + if (end_time.Initialized()) + ApplyDelay(end_time); +} + +MonoTime TraceEventSyntheticDelay::CalculateEndTimeLocked( + const MonoTime& start_time) { + if (mode_ == ONE_SHOT && trigger_count_++) + return MonoTime(); + else if (mode_ == ALTERNATING && trigger_count_++ % 2) + return MonoTime(); + return start_time + target_duration_; +} + +void TraceEventSyntheticDelay::ApplyDelay(const MonoTime& end_time) { + TRACE_EVENT0("synthetic_delay", name_.c_str()); + while (clock_->Now() < end_time) { + // Busy loop. + } +} + +TraceEventSyntheticDelayRegistry* +TraceEventSyntheticDelayRegistry::GetInstance() { + return Singleton<TraceEventSyntheticDelayRegistry>::get(); +} + +TraceEventSyntheticDelayRegistry::TraceEventSyntheticDelayRegistry() + : delay_count_(0) {} + +TraceEventSyntheticDelay* TraceEventSyntheticDelayRegistry::GetOrCreateDelay( + const char* name) { + // Try to find an existing delay first without locking to make the common case + // fast. + int delay_count = base::subtle::Acquire_Load(&delay_count_); + for (int i = 0; i < delay_count; ++i) { + if (!strcmp(name, delays_[i].name_.c_str())) + return &delays_[i]; + } + + MutexLock lock(lock_); + delay_count = base::subtle::Acquire_Load(&delay_count_); + for (int i = 0; i < delay_count; ++i) { + if (!strcmp(name, delays_[i].name_.c_str())) + return &delays_[i]; + } + + DCHECK(delay_count < kMaxSyntheticDelays) + << "must increase kMaxSyntheticDelays"; + if (delay_count >= kMaxSyntheticDelays) + return &dummy_delay_; + + delays_[delay_count].Initialize(std::string(name), this); + base::subtle::Release_Store(&delay_count_, delay_count + 1); + return &delays_[delay_count]; +} + +MonoTime TraceEventSyntheticDelayRegistry::Now() { + return MonoTime::Now(); +} + +void TraceEventSyntheticDelayRegistry::ResetAllDelays() { + MutexLock lock(lock_); + int delay_count = base::subtle::Acquire_Load(&delay_count_); + for (int i = 0; i < delay_count; ++i) { + delays_[i].SetTargetDuration(MonoDelta()); + delays_[i].SetClock(this); + } +} + +void ResetTraceEventSyntheticDelays() { + TraceEventSyntheticDelayRegistry::GetInstance()->ResetAllDelays(); +} + +} // namespace debug +} // namespace kudu + +namespace trace_event_internal { + +ScopedSyntheticDelay::ScopedSyntheticDelay(const char* name, + AtomicWord* impl_ptr) + : delay_impl_(GetOrCreateDelay(name, impl_ptr)) { + delay_impl_->BeginParallel(&end_time_); +} + +ScopedSyntheticDelay::~ScopedSyntheticDelay() { + delay_impl_->EndParallel(end_time_); +} + +kudu::debug::TraceEventSyntheticDelay* GetOrCreateDelay( + const char* name, + AtomicWord* impl_ptr) { + kudu::debug::TraceEventSyntheticDelay* delay_impl = + reinterpret_cast<kudu::debug::TraceEventSyntheticDelay*>( + base::subtle::Acquire_Load(impl_ptr)); + if (!delay_impl) { + delay_impl = kudu::debug::TraceEventSyntheticDelayRegistry::GetInstance() + ->GetOrCreateDelay(name); + base::subtle::Release_Store( + impl_ptr, reinterpret_cast<AtomicWord>(delay_impl)); + } + return delay_impl; +} + +} // namespace trace_event_internal http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/debug/trace_event_synthetic_delay.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/debug/trace_event_synthetic_delay.h b/be/src/kudu/util/debug/trace_event_synthetic_delay.h new file mode 100644 index 0000000..f53d5f4 --- /dev/null +++ b/be/src/kudu/util/debug/trace_event_synthetic_delay.h @@ -0,0 +1,162 @@ +// Copyright 2014 The Chromium Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// The synthetic delay framework makes it possible to dynamically inject +// arbitrary delays into into different parts of the codebase. This can be used, +// for instance, for testing various task scheduling algorithms. +// +// The delays are specified in terms of a target duration for a given block of +// code. If the code executes faster than the duration, the thread is made to +// sleep until the deadline is met. +// +// Code can be instrumented for delays with two sets of macros. First, for +// delays that should apply within a scope, use the following macro: +// +// TRACE_EVENT_SYNTHETIC_DELAY("cc.LayerTreeHost.DrawAndSwap"); +// +// For delaying operations that span multiple scopes, use: +// +// TRACE_EVENT_SYNTHETIC_DELAY_BEGIN("cc.Scheduler.BeginMainFrame"); +// ... +// TRACE_EVENT_SYNTHETIC_DELAY_END("cc.Scheduler.BeginMainFrame"); +// +// Here BEGIN establishes the start time for the delay and END executes the +// delay based on the remaining time. If BEGIN is called multiple times in a +// row, END should be called a corresponding number of times. Only the last +// call to END will have an effect. +// +// Note that a single delay may begin on one thread and end on another. This +// implies that a single delay cannot not be applied in several threads at once. + +#ifndef KUDU_UTIL_DEBUG_TRACE_EVENT_SYNTHETIC_DELAY_H_ +#define KUDU_UTIL_DEBUG_TRACE_EVENT_SYNTHETIC_DELAY_H_ + +#include "kudu/gutil/atomicops.h" +#include "kudu/util/debug/trace_event.h" +#include "kudu/util/monotime.h" + +// Apply a named delay in the current scope. +#define TRACE_EVENT_SYNTHETIC_DELAY(name) \ + static AtomicWord INTERNAL_TRACE_EVENT_UID(impl_ptr) = 0; \ + trace_event_internal::ScopedSyntheticDelay INTERNAL_TRACE_EVENT_UID(delay)( \ + name, &INTERNAL_TRACE_EVENT_UID(impl_ptr)); + +// Begin a named delay, establishing its timing start point. May be called +// multiple times as long as the calls to TRACE_EVENT_SYNTHETIC_DELAY_END are +// balanced. Only the first call records the timing start point. +#define TRACE_EVENT_SYNTHETIC_DELAY_BEGIN(name) \ + do { \ + static AtomicWord impl_ptr = 0; \ + trace_event_internal::GetOrCreateDelay(name, &impl_ptr)->Begin(); \ + } while (false) + +// End a named delay. The delay is applied only if this call matches the +// first corresponding call to TRACE_EVENT_SYNTHETIC_DELAY_BEGIN with the +// same delay. +#define TRACE_EVENT_SYNTHETIC_DELAY_END(name) \ + do { \ + static AtomicWord impl_ptr = 0; \ + trace_event_internal::GetOrCreateDelay(name, &impl_ptr)->End(); \ + } while (false) + +namespace kudu { +namespace debug { + +// Time source for computing delay durations. Used for testing. +class TRACE_EVENT_API_CLASS_EXPORT TraceEventSyntheticDelayClock { + public: + TraceEventSyntheticDelayClock(); + virtual ~TraceEventSyntheticDelayClock(); + virtual MonoTime Now() = 0; + + private: + DISALLOW_COPY_AND_ASSIGN(TraceEventSyntheticDelayClock); +}; + +// Single delay point instance. +class TRACE_EVENT_API_CLASS_EXPORT TraceEventSyntheticDelay { + public: + enum Mode { + STATIC, // Apply the configured delay every time. + ONE_SHOT, // Apply the configured delay just once. + ALTERNATING // Apply the configured delay every other time. + }; + + // Returns an existing named delay instance or creates a new one with |name|. + static TraceEventSyntheticDelay* Lookup(const std::string& name); + + void SetTargetDuration(const MonoDelta& target_duration); + void SetMode(Mode mode); + void SetClock(TraceEventSyntheticDelayClock* clock); + + // Begin the delay, establishing its timing start point. May be called + // multiple times as long as the calls to End() are balanced. Only the first + // call records the timing start point. + void Begin(); + + // End the delay. The delay is applied only if this call matches the first + // corresponding call to Begin() with the same delay. + void End(); + + // Begin a parallel instance of the delay. Several parallel instances may be + // active simultaneously and will complete independently. The computed end + // time for the delay is stored in |out_end_time|, which should later be + // passed to EndParallel(). + void BeginParallel(MonoTime* out_end_time); + + // End a previously started parallel delay. |end_time| is the delay end point + // computed by BeginParallel(). + void EndParallel(const MonoTime& end_time); + + private: + TraceEventSyntheticDelay(); + ~TraceEventSyntheticDelay(); + friend class TraceEventSyntheticDelayRegistry; + + void Initialize(const std::string& name, + TraceEventSyntheticDelayClock* clock); + MonoTime CalculateEndTimeLocked(const MonoTime& start_time); + void ApplyDelay(const MonoTime& end_time); + + Mutex lock_; + Mode mode_; + std::string name_; + int begin_count_; + int trigger_count_; + MonoTime end_time_; + MonoDelta target_duration_; + TraceEventSyntheticDelayClock* clock_; + + DISALLOW_COPY_AND_ASSIGN(TraceEventSyntheticDelay); +}; + +// Set the target durations of all registered synthetic delay points to zero. +TRACE_EVENT_API_CLASS_EXPORT void ResetTraceEventSyntheticDelays(); + +} // namespace debug +} // namespace kudu + +namespace trace_event_internal { + +// Helper class for scoped delays. Do not use directly. +class TRACE_EVENT_API_CLASS_EXPORT ScopedSyntheticDelay { + public: + explicit ScopedSyntheticDelay(const char* name, + AtomicWord* impl_ptr); + ~ScopedSyntheticDelay(); + + private: + kudu::debug::TraceEventSyntheticDelay* delay_impl_; + kudu::MonoTime end_time_; + + DISALLOW_COPY_AND_ASSIGN(ScopedSyntheticDelay); +}; + +// Helper for registering delays. Do not use directly. +TRACE_EVENT_API_CLASS_EXPORT kudu::debug::TraceEventSyntheticDelay* + GetOrCreateDelay(const char* name, AtomicWord* impl_ptr); + +} // namespace trace_event_internal + +#endif /* KUDU_UTIL_DEBUG_TRACE_EVENT_SYNTHETIC_DELAY_H_ */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/debug/trace_logging.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/debug/trace_logging.h b/be/src/kudu/util/debug/trace_logging.h new file mode 100644 index 0000000..c497562 --- /dev/null +++ b/be/src/kudu/util/debug/trace_logging.h @@ -0,0 +1,118 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +// This header defines the following macro: +// +// VLOG_AND_TRACE(category, vlevel) +// +// Write a log message to VLOG(vlevel) as well as the current +// trace event buffer as an "INSTANT" trace event type. If the +// given vlog level is not enabled, this will still result in a +// trace buffer entry. +// +// The provided 'category' should be a trace event category, which +// allows the users to filter which trace events to enable. +// For example: +// +// VLOG_AND_TRACE("my_subsystem", 1) << "This always shows up in trace buffers " +// << "but only shows up in the log if VLOG(1) level logging is enabled."; +// +// Most VLOG(1) level log messages are reasonable to use this macro. +// Note that there is slightly more overhead to this macro as opposed +// to just using VLOG(1). +// +// Note that, like VLOG(n), this macro avoids evaluating its arguments unless +// either trace recording or VLOG(n) is enabled. In the case that both are enabled, +// the arguments are only evaluated once. +// +#ifndef KUDU_DEBUG_TRACE_LOGGING_H +#define KUDU_DEBUG_TRACE_LOGGING_H + +#include <glog/logging.h> +#include <string> + +#include "kudu/gutil/macros.h" +#include "kudu/util/debug/trace_event.h" + +// The inner workings of these macros are a bit arcane: +// - We make use of the fact that a block can be embedded within a ternary expression. +// This allows us to determine whether the trace event is enabled before we decide +// to evaluate the arguments. +// - We have to use google::LogMessageVoidify so that we can put 'void(0)' on one side +// of the ternary expression and the log stream on the other. This technique is +// cribbed from glog/logging.h. +#define VLOG_AND_TRACE_INTERNAL(category, vlevel) \ + kudu::debug::TraceVLog(__FILE__, __LINE__, category, VLOG_IS_ON(vlevel)).stream() +#define VLOG_AND_TRACE(category, vlevel) \ + !( { \ + bool enabled; \ + TRACE_EVENT_CATEGORY_GROUP_ENABLED(category, &enabled); \ + enabled || VLOG_IS_ON(vlevel); \ + } ) ? static_cast<void>(0) : \ + google::LogMessageVoidify() & VLOG_AND_TRACE_INTERNAL(category, vlevel) + +namespace kudu { +namespace debug { + +class TraceVLog { + public: + TraceVLog(const char* file, int line, const char* category, bool do_vlog) + : sink_(category), + google_msg_(file, line, google::GLOG_INFO, &sink_, do_vlog) { + } + + std::ostream& stream() { + return google_msg_.stream(); + } + + private: + class TraceLogSink : public google::LogSink { + public: + explicit TraceLogSink(const char* category) : category_(category) {} + void send(google::LogSeverity severity, const char* full_filename, + const char* base_filename, int line, + const struct ::tm* tm_time, const char* message, + size_t message_len) override { + // Rather than calling TRACE_EVENT_INSTANT here, we have to do it from + // the destructor. This is because glog holds its internal mutex while + // calling send(). So, if we try to use TRACE_EVENT here, and --trace_to_console + // is enabled, then we'd end up calling back into glog when its lock is already + // held. glog isn't re-entrant, so that causes a crash. + // + // By just storing the string here, and then emitting the trace in the dtor, + // we defer the tracing until the google::LogMessage has destructed and the + // glog lock is available again. + str_ = ToString(severity, base_filename, line, + tm_time, message, message_len); + } + virtual ~TraceLogSink() { + TRACE_EVENT_INSTANT1(category_, "vlog", TRACE_EVENT_SCOPE_THREAD, + "msg", str_); + } + + private: + const char* const category_; + std::string str_; + }; + + TraceLogSink sink_; + google::LogMessage google_msg_; +}; + +} // namespace debug +} // namespace kudu +#endif /* KUDU_DEBUG_TRACE_LOGGING_H */ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/debug_ref_counted.h ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/debug_ref_counted.h b/be/src/kudu/util/debug_ref_counted.h new file mode 100644 index 0000000..7c2deca --- /dev/null +++ b/be/src/kudu/util/debug_ref_counted.h @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef KUDU_UTIL_DEBUG_REF_COUNTED_H_ +#define KUDU_UTIL_DEBUG_REF_COUNTED_H_ + +#include <glog/logging.h> + +#include "kudu/gutil/ref_counted.h" +#include "kudu/util/debug-util.h" + +namespace kudu { + +// For use in debugging. Change a ref-counted class to inherit from this, +// instead of RefCountedThreadSafe, and fill your logs with stack traces. +template <class T, typename Traits = DefaultRefCountedThreadSafeTraits<T> > +class DebugRefCountedThreadSafe : public RefCountedThreadSafe<T, Traits> { + public: + DebugRefCountedThreadSafe() {} + + void AddRef() const { + RefCountedThreadSafe<T, Traits>::AddRef(); + LOG(INFO) << "Incremented ref on " << this << ":\n" << GetStackTrace(); + } + + void Release() const { + LOG(INFO) << "Decrementing ref on " << this << ":\n" << GetStackTrace(); + RefCountedThreadSafe<T, Traits>::Release(); + } + + protected: + ~DebugRefCountedThreadSafe() {} + + private: + friend struct DefaultRefCountedThreadSafeTraits<T>; + + DISALLOW_COPY_AND_ASSIGN(DebugRefCountedThreadSafe); +}; + +} // namespace kudu + +#endif // KUDU_UTIL_DEBUG_REF_COUNTED_H_ http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/env-test.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/env-test.cc b/be/src/kudu/util/env-test.cc new file mode 100644 index 0000000..fb79a69 --- /dev/null +++ b/be/src/kudu/util/env-test.cc @@ -0,0 +1,981 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <fcntl.h> +#include <sys/types.h> + +#include <memory> +#include <string> + +#include <glog/logging.h> +#include <glog/stl_logging.h> +#include <gtest/gtest.h> + +#include "kudu/gutil/bind.h" +#include "kudu/gutil/strings/human_readable.h" +#include "kudu/gutil/strings/substitute.h" +#include "kudu/gutil/strings/util.h" +#include "kudu/util/env.h" +#include "kudu/util/env_util.h" +#include "kudu/util/malloc.h" +#include "kudu/util/path_util.h" +#include "kudu/util/status.h" +#include "kudu/util/stopwatch.h" +#include "kudu/util/test_util.h" + +#if !defined(__APPLE__) +#include <linux/falloc.h> +#endif // !defined(__APPLE__) +// Copied from falloc.h. Useful for older kernels that lack support for +// hole punching; fallocate(2) will return EOPNOTSUPP. +#ifndef FALLOC_FL_KEEP_SIZE +#define FALLOC_FL_KEEP_SIZE 0x01 /* default is extend size */ +#endif +#ifndef FALLOC_FL_PUNCH_HOLE +#define FALLOC_FL_PUNCH_HOLE 0x02 /* de-allocates range */ +#endif + +DECLARE_bool(never_fsync); +DECLARE_int32(env_inject_short_read_bytes); +DECLARE_int32(env_inject_short_write_bytes); + +namespace kudu { + +using std::shared_ptr; +using std::string; +using std::unique_ptr; +using std::vector; +using strings::Substitute; + +static const uint64_t kOneMb = 1024 * 1024; +static const uint64_t kTwoMb = 2 * kOneMb; + +class TestEnv : public KuduTest { + public: + virtual void SetUp() OVERRIDE { + KuduTest::SetUp(); + CheckFallocateSupport(); + } + + // Verify that fallocate() is supported in the test directory. + // Some local file systems like ext3 do not support it, and we don't + // want to fail tests on those systems. + // + // Sets fallocate_supported_ based on the result. + void CheckFallocateSupport() { + static bool checked = false; + if (checked) return; + +#if defined(__linux__) + int fd = creat(GetTestPath("check-fallocate").c_str(), S_IWUSR); + PCHECK(fd >= 0); + int err = fallocate(fd, 0, 0, 4096); + if (err != 0) { + PCHECK(errno == ENOTSUP); + } else { + fallocate_supported_ = true; + + err = fallocate(fd, FALLOC_FL_KEEP_SIZE | FALLOC_FL_PUNCH_HOLE, + 1024, 1024); + if (err != 0) { + PCHECK(errno == ENOTSUP); + } else { + fallocate_punch_hole_supported_ = true; + } + } + + close(fd); +#endif + + checked = true; + } + + protected: + + void VerifyTestData(const Slice& read_data, size_t offset) { + for (int i = 0; i < read_data.size(); i++) { + size_t file_offset = offset + i; + ASSERT_EQ((file_offset * 31) & 0xff, read_data[i]) << "failed at " << i; + } + } + + void MakeVectors(int num_slices, int slice_size, int num_iterations, + unique_ptr<faststring[]>* data, vector<vector<Slice > >* vec) { + data->reset(new faststring[num_iterations * num_slices]); + vec->resize(num_iterations); + + int data_idx = 0; + int byte_idx = 0; + for (int vec_idx = 0; vec_idx < num_iterations; vec_idx++) { + vector<Slice>& iter_vec = vec->at(vec_idx); + iter_vec.resize(num_slices); + for (int i = 0; i < num_slices; i++) { + (*data)[data_idx].resize(slice_size); + for (int j = 0; j < slice_size; j++) { + (*data)[data_idx][j] = (byte_idx * 31) & 0xff; + ++byte_idx; + } + iter_vec[i]= Slice((*data)[data_idx]); + ++data_idx; + } + } + } + + void ReadAndVerifyTestData(RandomAccessFile* raf, size_t offset, size_t n) { + unique_ptr<uint8_t[]> scratch(new uint8_t[n]); + Slice s(scratch.get(), n); + ASSERT_OK(raf->Read(offset, &s)); + ASSERT_EQ(n, s.size()); + ASSERT_NO_FATAL_FAILURE(VerifyTestData(s, offset)); + } + + void TestAppendV(size_t num_slices, size_t slice_size, size_t iterations, + bool fast, bool pre_allocate, + const WritableFileOptions &opts) { + const string kTestPath = GetTestPath("test_env_appendvec_read_append"); + shared_ptr<WritableFile> file; + ASSERT_OK(env_util::OpenFileForWrite(opts, env_, kTestPath, &file)); + + if (pre_allocate) { + ASSERT_OK(file->PreAllocate(num_slices * slice_size * iterations)); + ASSERT_OK(file->Sync()); + } + + unique_ptr<faststring[]> data; + vector<vector<Slice> > input; + + MakeVectors(num_slices, slice_size, iterations, &data, &input); + + // Force short writes to half the slice length. + FLAGS_env_inject_short_write_bytes = slice_size / 2; + + shared_ptr<RandomAccessFile> raf; + + if (!fast) { + ASSERT_OK(env_util::OpenFileForRandom(env_, kTestPath, &raf)); + } + + srand(123); + + const string test_descr = Substitute( + "appending a vector of slices(number of slices=$0,size of slice=$1 b) $2 times", + num_slices, slice_size, iterations); + LOG_TIMING(INFO, test_descr) { + for (int i = 0; i < iterations; i++) { + if (fast || random() % 2) { + ASSERT_OK(file->AppendV(input[i])); + } else { + for (const Slice& slice : input[i]) { + ASSERT_OK(file->Append(slice)); + } + } + if (!fast) { + // Verify as write. Note: this requires that file is pre-allocated, otherwise + // the Read() fails with EINVAL. + ASSERT_NO_FATAL_FAILURE(ReadAndVerifyTestData(raf.get(), num_slices * slice_size * i, + num_slices * slice_size)); + } + } + } + + // Verify the entire file + ASSERT_OK(file->Close()); + + if (fast) { + ASSERT_OK(env_util::OpenFileForRandom(env_, kTestPath, &raf)); + } + for (int i = 0; i < iterations; i++) { + ASSERT_NO_FATAL_FAILURE(ReadAndVerifyTestData(raf.get(), num_slices * slice_size * i, + num_slices * slice_size)); + } + } + + static bool fallocate_supported_; + static bool fallocate_punch_hole_supported_; +}; + +bool TestEnv::fallocate_supported_ = false; +bool TestEnv::fallocate_punch_hole_supported_ = false; + +TEST_F(TestEnv, TestPreallocate) { + if (!fallocate_supported_) { + LOG(INFO) << "fallocate not supported, skipping test"; + return; + } + LOG(INFO) << "Testing PreAllocate()"; + string test_path = GetTestPath("test_env_wf"); + shared_ptr<WritableFile> file; + ASSERT_OK(env_util::OpenFileForWrite(env_, test_path, &file)); + + // pre-allocate 1 MB + ASSERT_OK(file->PreAllocate(kOneMb)); + ASSERT_OK(file->Sync()); + + // the writable file size should report 0 + ASSERT_EQ(file->Size(), 0); + // but the real size of the file on disk should report 1MB + uint64_t size; + ASSERT_OK(env_->GetFileSize(test_path, &size)); + ASSERT_EQ(size, kOneMb); + + // write 1 MB + uint8_t scratch[kOneMb]; + Slice slice(scratch, kOneMb); + ASSERT_OK(file->Append(slice)); + ASSERT_OK(file->Sync()); + + // the writable file size should now report 1 MB + ASSERT_EQ(file->Size(), kOneMb); + ASSERT_OK(file->Close()); + // and the real size for the file on disk should match ony the + // written size + ASSERT_OK(env_->GetFileSize(test_path, &size)); + ASSERT_EQ(kOneMb, size); +} + +// To test consecutive pre-allocations we need higher pre-allocations since the +// mmapped regions grow in size until 2MBs (so smaller pre-allocations will easily +// be smaller than the mmapped regions size). +TEST_F(TestEnv, TestConsecutivePreallocate) { + if (!fallocate_supported_) { + LOG(INFO) << "fallocate not supported, skipping test"; + return; + } + LOG(INFO) << "Testing consecutive PreAllocate()"; + string test_path = GetTestPath("test_env_wf"); + shared_ptr<WritableFile> file; + ASSERT_OK(env_util::OpenFileForWrite(env_, test_path, &file)); + + // pre-allocate 64 MB + ASSERT_OK(file->PreAllocate(64 * kOneMb)); + ASSERT_OK(file->Sync()); + + // the writable file size should report 0 + ASSERT_EQ(file->Size(), 0); + // but the real size of the file on disk should report 64 MBs + uint64_t size; + ASSERT_OK(env_->GetFileSize(test_path, &size)); + ASSERT_EQ(size, 64 * kOneMb); + + // write 1 MB + uint8_t scratch[kOneMb]; + Slice slice(scratch, kOneMb); + ASSERT_OK(file->Append(slice)); + ASSERT_OK(file->Sync()); + + // the writable file size should now report 1 MB + ASSERT_EQ(kOneMb, file->Size()); + ASSERT_OK(env_->GetFileSize(test_path, &size)); + ASSERT_EQ(64 * kOneMb, size); + + // pre-allocate 64 additional MBs + ASSERT_OK(file->PreAllocate(64 * kOneMb)); + ASSERT_OK(file->Sync()); + + // the writable file size should now report 1 MB + ASSERT_EQ(kOneMb, file->Size()); + // while the real file size should report 128 MB's + ASSERT_OK(env_->GetFileSize(test_path, &size)); + ASSERT_EQ(128 * kOneMb, size); + + // write another MB + ASSERT_OK(file->Append(slice)); + ASSERT_OK(file->Sync()); + + // the writable file size should now report 2 MB + ASSERT_EQ(file->Size(), 2 * kOneMb); + // while the real file size should reamin at 128 MBs + ASSERT_OK(env_->GetFileSize(test_path, &size)); + ASSERT_EQ(128 * kOneMb, size); + + // close the file (which ftruncates it to the real size) + ASSERT_OK(file->Close()); + // and the real size for the file on disk should match only the written size + ASSERT_OK(env_->GetFileSize(test_path, &size)); + ASSERT_EQ(2* kOneMb, size); + +} + +TEST_F(TestEnv, TestHolePunch) { + if (!fallocate_punch_hole_supported_) { + LOG(INFO) << "hole punching not supported, skipping test"; + return; + } + string test_path = GetTestPath("test_env_wf"); + unique_ptr<RWFile> file; + ASSERT_OK(env_->NewRWFile(test_path, &file)); + + // Write 1 MB. The size and size-on-disk both agree. + uint8_t scratch[kOneMb]; + Slice slice(scratch, kOneMb); + ASSERT_OK(file->Write(0, slice)); + ASSERT_OK(file->Sync()); + uint64_t sz; + ASSERT_OK(file->Size(&sz)); + ASSERT_EQ(kOneMb, sz); + uint64_t size_on_disk; + ASSERT_OK(env_->GetFileSizeOnDisk(test_path, &size_on_disk)); + // Some kernels and filesystems (e.g. Centos 6.6 with XFS) aggressively + // preallocate file disk space when writing to files, so the disk space may be + // greater than 1MiB. + ASSERT_LE(kOneMb, size_on_disk); + + // Punch some data out at byte marker 4096. Now the two sizes diverge. + uint64_t punch_amount = 4096 * 4; + uint64_t new_size_on_disk; + ASSERT_OK(file->PunchHole(4096, punch_amount)); + ASSERT_OK(file->Size(&sz)); + ASSERT_EQ(kOneMb, sz); + ASSERT_OK(env_->GetFileSizeOnDisk(test_path, &new_size_on_disk)); + ASSERT_EQ(size_on_disk - punch_amount, new_size_on_disk); +} + +TEST_F(TestEnv, TestTruncate) { + LOG(INFO) << "Testing Truncate()"; + string test_path = GetTestPath("test_env_wf"); + unique_ptr<RWFile> file; + ASSERT_OK(env_->NewRWFile(test_path, &file)); + uint64_t size; + ASSERT_OK(file->Size(&size)); + ASSERT_EQ(0, size); + + // Truncate to 2 MB (up). + ASSERT_OK(file->Truncate(kTwoMb)); + ASSERT_OK(file->Size(&size)); + ASSERT_EQ(kTwoMb, size); + ASSERT_OK(env_->GetFileSize(test_path, &size)); + ASSERT_EQ(kTwoMb, size); + + // Truncate to 1 MB (down). + ASSERT_OK(file->Truncate(kOneMb)); + ASSERT_OK(file->Size(&size)); + ASSERT_EQ(kOneMb, size); + ASSERT_OK(env_->GetFileSize(test_path, &size)); + ASSERT_EQ(kOneMb, size); + + ASSERT_OK(file->Close()); + + // Read the whole file. Ensure it is all zeroes. + unique_ptr<RandomAccessFile> raf; + ASSERT_OK(env_->NewRandomAccessFile(test_path, &raf)); + unique_ptr<uint8_t[]> scratch(new uint8_t[size]); + Slice s(scratch.get(), size); + ASSERT_OK(raf->Read(0, &s)); + const uint8_t* data = s.data(); + for (int i = 0; i < size; i++) { + ASSERT_EQ(0, data[i]) << "Not null at position " << i; + } +} + +// Write 'size' bytes of data to a file, with a simple pattern stored in it. +static void WriteTestFile(Env* env, const string& path, size_t size) { + shared_ptr<WritableFile> wf; + ASSERT_OK(env_util::OpenFileForWrite(env, path, &wf)); + faststring data; + data.resize(size); + for (int i = 0; i < data.size(); i++) { + data[i] = (i * 31) & 0xff; + } + ASSERT_OK(wf->Append(Slice(data))); + ASSERT_OK(wf->Close()); +} + +TEST_F(TestEnv, TestReadFully) { + SeedRandom(); + const string kTestPath = GetTestPath("test"); + const int kFileSize = 64 * 1024; + Env* env = Env::Default(); + + WriteTestFile(env, kTestPath, kFileSize); + ASSERT_NO_FATAL_FAILURE(); + + // Reopen for read + shared_ptr<RandomAccessFile> raf; + ASSERT_OK(env_util::OpenFileForRandom(env, kTestPath, &raf)); + + const int kReadLength = 10000; + unique_ptr<uint8_t[]> scratch(new uint8_t[kReadLength]); + Slice s(scratch.get(), kReadLength); + + // Force a short read to half the data length + FLAGS_env_inject_short_read_bytes = kReadLength / 2; + + // Verify that Read fully reads the whole requested data. + ASSERT_OK(raf->Read(0, &s)); + ASSERT_EQ(s.data(), scratch.get()) << "Should have returned a contiguous copy"; + ASSERT_EQ(kReadLength, s.size()); + + // Verify that the data read was correct. + VerifyTestData(s, 0); + + // Turn short reads off again + FLAGS_env_inject_short_read_bytes = 0; + + // Verify that Read fails with an IOError at EOF. + Slice s2(scratch.get(), 200); + Status status = raf->Read(kFileSize - 100, &s2); + ASSERT_FALSE(status.ok()); + ASSERT_TRUE(status.IsIOError()); + ASSERT_STR_CONTAINS(status.ToString(), "EOF"); +} + +TEST_F(TestEnv, TestReadVFully) { + // Create the file. + unique_ptr<RWFile> file; + ASSERT_OK(env_->NewRWFile(GetTestPath("foo"), &file)); + + // Append to it. + string kTestData = "abcde12345"; + ASSERT_OK(file->Write(0, kTestData)); + + // Setup read parameters + size_t size1 = 5; + uint8_t scratch1[size1]; + Slice result1(scratch1, size1); + size_t size2 = 5; + uint8_t scratch2[size2]; + Slice result2(scratch2, size2); + vector<Slice> results = { result1, result2 }; + + // Force a short read + FLAGS_env_inject_short_read_bytes = 3; + + // Verify that Read fully reads the whole requested data. + ASSERT_OK(file->ReadV(0, &results)); + ASSERT_EQ(result1, "abcde"); + ASSERT_EQ(result2, "12345"); + + // Turn short reads off again + FLAGS_env_inject_short_read_bytes = 0; + + // Verify that Read fails with an IOError at EOF. + Status status = file->ReadV(5, &results); + ASSERT_FALSE(status.ok()); + ASSERT_TRUE(status.IsIOError()); + ASSERT_STR_CONTAINS(status.ToString(), "EOF"); +} + +TEST_F(TestEnv, TestIOVMax) { + Env* env = Env::Default(); + const string kTestPath = GetTestPath("test"); + + const size_t slice_count = IOV_MAX + 42; + const size_t slice_size = 5; + const size_t data_size = slice_count * slice_size; + + NO_FATALS(WriteTestFile(env, kTestPath, data_size)); + + // Reopen for read + shared_ptr<RandomAccessFile> file; + ASSERT_OK(env_util::OpenFileForRandom(env, kTestPath, &file)); + + // Setup more results slices than IOV_MAX + uint8_t scratch[data_size]; + vector<Slice> results; + for (size_t i = 0; i < slice_count; i++) { + size_t shift = slice_size * i; + results.emplace_back(scratch + shift, slice_size); + } + + // Force a short read too + FLAGS_env_inject_short_read_bytes = 3; + + // Verify all the data is read + ASSERT_OK(file->ReadV(0, &results)); + VerifyTestData(Slice(scratch, data_size), 0); +} + +TEST_F(TestEnv, TestAppendV) { + WritableFileOptions opts; + LOG(INFO) << "Testing AppendV() only, NO pre-allocation"; + ASSERT_NO_FATAL_FAILURE(TestAppendV(2000, 1024, 5, true, false, opts)); + + if (!fallocate_supported_) { + LOG(INFO) << "fallocate not supported, skipping preallocated runs"; + } else { + LOG(INFO) << "Testing AppendV() only, WITH pre-allocation"; + ASSERT_NO_FATAL_FAILURE(TestAppendV(2000, 1024, 5, true, true, opts)); + LOG(INFO) << "Testing AppendV() together with Append() and Read(), WITH pre-allocation"; + ASSERT_NO_FATAL_FAILURE(TestAppendV(128, 4096, 5, false, true, opts)); + } +} + +TEST_F(TestEnv, TestGetExecutablePath) { + string p; + ASSERT_OK(Env::Default()->GetExecutablePath(&p)); + ASSERT_TRUE(HasSuffixString(p, "env-test")) << p; +} + +TEST_F(TestEnv, TestOpenEmptyRandomAccessFile) { + Env* env = Env::Default(); + string test_file = GetTestPath("test_file"); + ASSERT_NO_FATAL_FAILURE(WriteTestFile(env, test_file, 0)); + unique_ptr<RandomAccessFile> readable_file; + ASSERT_OK(env->NewRandomAccessFile(test_file, &readable_file)); + uint64_t size; + ASSERT_OK(readable_file->Size(&size)); + ASSERT_EQ(0, size); +} + +TEST_F(TestEnv, TestOverwrite) { + string test_path = GetTestPath("test_env_wf"); + + // File does not exist, create it. + shared_ptr<WritableFile> writer; + ASSERT_OK(env_util::OpenFileForWrite(env_, test_path, &writer)); + + // File exists, overwrite it. + ASSERT_OK(env_util::OpenFileForWrite(env_, test_path, &writer)); + + // File exists, try to overwrite (and fail). + WritableFileOptions opts; + opts.mode = Env::CREATE_NON_EXISTING; + Status s = env_util::OpenFileForWrite(opts, + env_, test_path, &writer); + ASSERT_TRUE(s.IsAlreadyPresent()); +} + +TEST_F(TestEnv, TestReopen) { + LOG(INFO) << "Testing reopening behavior"; + string test_path = GetTestPath("test_env_wf"); + string first = "The quick brown fox"; + string second = "jumps over the lazy dog"; + + // Create the file and write to it. + shared_ptr<WritableFile> writer; + ASSERT_OK(env_util::OpenFileForWrite(WritableFileOptions(), + env_, test_path, &writer)); + ASSERT_OK(writer->Append(first)); + ASSERT_EQ(first.length(), writer->Size()); + ASSERT_OK(writer->Close()); + + // Reopen it and append to it. + WritableFileOptions reopen_opts; + reopen_opts.mode = Env::OPEN_EXISTING; + ASSERT_OK(env_util::OpenFileForWrite(reopen_opts, + env_, test_path, &writer)); + ASSERT_EQ(first.length(), writer->Size()); + ASSERT_OK(writer->Append(second)); + ASSERT_EQ(first.length() + second.length(), writer->Size()); + ASSERT_OK(writer->Close()); + + // Check that the file has both strings. + shared_ptr<RandomAccessFile> reader; + ASSERT_OK(env_util::OpenFileForRandom(env_, test_path, &reader)); + uint64_t size; + ASSERT_OK(reader->Size(&size)); + ASSERT_EQ(first.length() + second.length(), size); + uint8_t scratch[size]; + Slice s(scratch, size); + ASSERT_OK(reader->Read(0, &s)); + ASSERT_EQ(first + second, s.ToString()); +} + +TEST_F(TestEnv, TestIsDirectory) { + string dir = GetTestPath("a_directory"); + ASSERT_OK(env_->CreateDir(dir)); + bool is_dir; + ASSERT_OK(env_->IsDirectory(dir, &is_dir)); + ASSERT_TRUE(is_dir); + + string not_dir = GetTestPath("not_a_directory"); + unique_ptr<WritableFile> writer; + ASSERT_OK(env_->NewWritableFile(not_dir, &writer)); + ASSERT_OK(env_->IsDirectory(not_dir, &is_dir)); + ASSERT_FALSE(is_dir); +} + +// Regression test for KUDU-1776. +TEST_F(TestEnv, TestIncreaseOpenFileLimit) { + int64_t limit_before = env_->GetOpenFileLimit(); + env_->IncreaseOpenFileLimit(); + int64_t limit_after = env_->GetOpenFileLimit(); + ASSERT_GE(limit_after, limit_before) << "Failed to retain/increase open file limit"; +} + +static Status TestWalkCb(vector<string>* actual, + Env::FileType type, + const string& dirname, const string& basename) { + VLOG(1) << type << ":" << dirname << ":" << basename; + actual->push_back(JoinPathSegments(dirname, basename)); + return Status::OK(); +} + +static Status CreateDir(Env* env, const string& name, vector<string>* created) { + RETURN_NOT_OK(env->CreateDir(name)); + created->push_back(name); + return Status::OK(); +} + +static Status CreateFile(Env* env, const string& name, vector<string>* created) { + unique_ptr<WritableFile> writer; + RETURN_NOT_OK(env->NewWritableFile(name, &writer)); + created->push_back(writer->filename()); + return Status::OK(); +} + +TEST_F(TestEnv, TestWalk) { + // We test with this tree: + // + // /root/ + // /root/file_1 + // /root/file_2 + // /root/dir_a/file_1 + // /root/dir_a/file_2 + // /root/dir_b/file_1 + // /root/dir_b/file_2 + // /root/dir_b/dir_c/file_1 + // /root/dir_b/dir_c/file_2 + string root = GetTestPath("root"); + string subdir_a = JoinPathSegments(root, "dir_a"); + string subdir_b = JoinPathSegments(root, "dir_b"); + string subdir_c = JoinPathSegments(subdir_b, "dir_c"); + string file_one = "file_1"; + string file_two = "file_2"; + vector<string> expected; + ASSERT_OK(CreateDir(env_, root, &expected)); + ASSERT_OK(CreateFile(env_, JoinPathSegments(root, file_one), &expected)); + ASSERT_OK(CreateFile(env_, JoinPathSegments(root, file_two), &expected)); + ASSERT_OK(CreateDir(env_, subdir_a, &expected)); + ASSERT_OK(CreateFile(env_, JoinPathSegments(subdir_a, file_one), &expected)); + ASSERT_OK(CreateFile(env_, JoinPathSegments(subdir_a, file_two), &expected)); + ASSERT_OK(CreateDir(env_, subdir_b, &expected)); + ASSERT_OK(CreateFile(env_, JoinPathSegments(subdir_b, file_one), &expected)); + ASSERT_OK(CreateFile(env_, JoinPathSegments(subdir_b, file_two), &expected)); + ASSERT_OK(CreateDir(env_, subdir_c, &expected)); + ASSERT_OK(CreateFile(env_, JoinPathSegments(subdir_c, file_one), &expected)); + ASSERT_OK(CreateFile(env_, JoinPathSegments(subdir_c, file_two), &expected)); + + // Do the walk. + // + // Sadly, tr1/unordered_set doesn't implement equality operators, so we + // compare sorted vectors instead. + vector<string> actual; + ASSERT_OK(env_->Walk(root, Env::PRE_ORDER, Bind(&TestWalkCb, &actual))); + sort(expected.begin(), expected.end()); + sort(actual.begin(), actual.end()); + ASSERT_EQ(expected, actual); +} + +static Status TestWalkErrorCb(int* num_calls, + Env::FileType type, + const string& dirname, const string& basename) { + (*num_calls)++; + return Status::Aborted("Returning abort status"); +} + +TEST_F(TestEnv, TestWalkCbReturnsError) { + string new_dir = GetTestPath("foo"); + string new_file = "myfile"; + ASSERT_OK(env_->CreateDir(new_dir)); + unique_ptr<WritableFile> writer; + ASSERT_OK(env_->NewWritableFile(JoinPathSegments(new_dir, new_file), &writer)); + int num_calls = 0; + ASSERT_TRUE(env_->Walk(new_dir, Env::PRE_ORDER, + Bind(&TestWalkErrorCb, &num_calls)).IsIOError()); + + // Once for the directory and once for the file inside it. + ASSERT_EQ(2, num_calls); +} + +TEST_F(TestEnv, TestGlob) { + string dir = GetTestPath("glob"); + ASSERT_OK(env_->CreateDir(dir)); + + vector<string> filenames = { "fuzz", "fuzzy", "fuzzyiest", "buzz" }; + vector<pair<string, size_t>> matchers = { + { "file", 0 }, + { "fuzz", 1 }, + { "fuzz*", 3 }, + { "?uzz", 2 }, + }; + + for (const auto& name : filenames) { + unique_ptr<WritableFile> file; + ASSERT_OK(env_->NewWritableFile(JoinPathSegments(dir, name), &file)); + } + + for (const auto& matcher : matchers) { + SCOPED_TRACE(Substitute("pattern: $0, expected matches: $1", + matcher.first, matcher.second)); + vector<string> matches; + ASSERT_OK(env_->Glob(JoinPathSegments(dir, matcher.first), &matches)); + ASSERT_EQ(matcher.second, matches.size()); + } +} + +TEST_F(TestEnv, TestGetBlockSize) { + uint64_t block_size; + + // Does not exist. + ASSERT_TRUE(env_->GetBlockSize("does_not_exist", &block_size).IsNotFound()); + + // Try with a directory. + ASSERT_OK(env_->GetBlockSize(".", &block_size)); + ASSERT_GT(block_size, 0); + + // Try with a file. + string path = GetTestPath("foo"); + unique_ptr<WritableFile> writer; + ASSERT_OK(env_->NewWritableFile(path, &writer)); + ASSERT_OK(env_->GetBlockSize(path, &block_size)); + ASSERT_GT(block_size, 0); +} + +TEST_F(TestEnv, TestGetFileModifiedTime) { + string path = GetTestPath("mtime"); + unique_ptr<WritableFile> writer; + ASSERT_OK(env_->NewWritableFile(path, &writer)); + + int64_t initial_time; + ASSERT_OK(env_->GetFileModifiedTime(writer->filename(), &initial_time)); + + // HFS has 1 second mtime granularity. + AssertEventually([&] { + int64_t after_time; + writer->Append(" "); + writer->Sync(); + ASSERT_OK(env_->GetFileModifiedTime(writer->filename(), &after_time)); + ASSERT_LT(initial_time, after_time); + }, MonoDelta::FromSeconds(5)); + NO_PENDING_FATALS(); +} + +TEST_F(TestEnv, TestRWFile) { + // Create the file. + unique_ptr<RWFile> file; + ASSERT_OK(env_->NewRWFile(GetTestPath("foo"), &file)); + + // Append to it. + string kTestData = "abcde"; + ASSERT_OK(file->Write(0, kTestData)); + + // Read from it. + uint8_t scratch[kTestData.length()]; + Slice result(scratch, kTestData.length()); + ASSERT_OK(file->Read(0, &result)); + ASSERT_EQ(result, kTestData); + uint64_t sz; + ASSERT_OK(file->Size(&sz)); + ASSERT_EQ(kTestData.length(), sz); + + // Read into multiple buffers + size_t size1 = 3; + uint8_t scratch1[size1]; + Slice result1(scratch1, size1); + size_t size2 = 2; + uint8_t scratch2[size2]; + Slice result2(scratch2, size2); + vector<Slice> results = { result1, result2 }; + ASSERT_OK(file->ReadV(0, &results)); + ASSERT_EQ(result1, "abc"); + ASSERT_EQ(result2, "de"); + + // Write past the end of the file and rewrite some of the interior. + ASSERT_OK(file->Write(kTestData.length() * 2, kTestData)); + ASSERT_OK(file->Write(kTestData.length(), kTestData)); + ASSERT_OK(file->Write(1, kTestData)); + string kNewTestData = "aabcdebcdeabcde"; + uint8_t scratch3[kNewTestData.length()]; + Slice result3(scratch3, kNewTestData.length()); + ASSERT_OK(file->Read(0, &result3)); + + // Retest. + ASSERT_EQ(result3, kNewTestData); + ASSERT_OK(file->Size(&sz)); + ASSERT_EQ(kNewTestData.length(), sz); + + // Make sure we can't overwrite it. + RWFileOptions opts; + opts.mode = Env::CREATE_NON_EXISTING; + ASSERT_TRUE(env_->NewRWFile(opts, GetTestPath("foo"), &file).IsAlreadyPresent()); + + // Reopen it without truncating the existing data. + opts.mode = Env::OPEN_EXISTING; + ASSERT_OK(env_->NewRWFile(opts, GetTestPath("foo"), &file)); + uint8_t scratch4[kNewTestData.length()]; + Slice result4(scratch4, kNewTestData.length()); + ASSERT_OK(file->Read(0, &result4)); + ASSERT_EQ(result3, kNewTestData); +} + +TEST_F(TestEnv, TestCanonicalize) { + vector<string> synonyms = { GetTestPath("."), GetTestPath("./."), GetTestPath(".//./") }; + for (const string& synonym : synonyms) { + string result; + ASSERT_OK(env_->Canonicalize(synonym, &result)); + ASSERT_EQ(test_dir_, result); + } + + string dir = GetTestPath("some_dir"); + ASSERT_OK(env_->CreateDir(dir)); + string result; + ASSERT_OK(env_->Canonicalize(dir + "/", &result)); + ASSERT_EQ(dir, result); + + ASSERT_TRUE(env_->Canonicalize(dir + "/bar", nullptr).IsNotFound()); +} + +TEST_F(TestEnv, TestGetTotalRAMBytes) { + int64_t ram = 0; + ASSERT_OK(env_->GetTotalRAMBytes(&ram)); + + // Can't test much about it. + ASSERT_GT(ram, 0); +} + +// Test that CopyFile() copies all the bytes properly. +TEST_F(TestEnv, TestCopyFile) { + string orig_path = GetTestPath("test"); + string copy_path = orig_path + ".copy"; + const int kFileSize = 1024 * 1024 + 11; // Some odd number of bytes. + + Env* env = Env::Default(); + NO_FATALS(WriteTestFile(env, orig_path, kFileSize)); + ASSERT_OK(env_util::CopyFile(env, orig_path, copy_path, WritableFileOptions())); + unique_ptr<RandomAccessFile> copy; + ASSERT_OK(env->NewRandomAccessFile(copy_path, ©)); + NO_FATALS(ReadAndVerifyTestData(copy.get(), 0, kFileSize)); +} + +// Simple regression test for NewTempRWFile(). +TEST_F(TestEnv, TestTempRWFile) { + string tmpl = "foo.XXXXXX"; + string path; + unique_ptr<RWFile> file; + + ASSERT_OK(env_->NewTempRWFile(RWFileOptions(), tmpl, &path, &file)); + ASSERT_NE(path, tmpl); + ASSERT_EQ(0, path.find("foo.")); + ASSERT_OK(file->Close()); + ASSERT_OK(env_->DeleteFile(path)); +} + +// Test that when we write data to disk we see SpaceInfo.free_bytes go down. +TEST_F(TestEnv, TestGetSpaceInfoFreeBytes) { + const string kDataDir = GetTestPath("parent"); + const string kTestFilePath = JoinPathSegments(kDataDir, "testfile"); + const int kFileSizeBytes = 256; + ASSERT_OK(env_->CreateDir(kDataDir)); + + // Loop in case there are concurrent tests running that are modifying the + // filesystem. + ASSERT_EVENTUALLY([&] { + if (env_->FileExists(kTestFilePath)) { + ASSERT_OK(env_->DeleteFile(kTestFilePath)); // Clean up the previous iteration. + } + SpaceInfo before_space_info; + ASSERT_OK(env_->GetSpaceInfo(kDataDir, &before_space_info)); + + NO_FATALS(WriteTestFile(env_, kTestFilePath, kFileSizeBytes)); + + SpaceInfo after_space_info; + ASSERT_OK(env_->GetSpaceInfo(kDataDir, &after_space_info)); + ASSERT_GE(before_space_info.free_bytes - after_space_info.free_bytes, kFileSizeBytes); + }); +} + +// Basic sanity check for GetSpaceInfo(). +TEST_F(TestEnv, TestGetSpaceInfoBasicInvariants) { + string path = GetTestDataDirectory(); + SpaceInfo space_info; + ASSERT_OK(env_->GetSpaceInfo(path, &space_info)); + ASSERT_GT(space_info.capacity_bytes, 0); + ASSERT_LE(space_info.free_bytes, space_info.capacity_bytes); + VLOG(1) << "Path " << path << " has capacity " + << HumanReadableNumBytes::ToString(space_info.capacity_bytes) + << " (" << HumanReadableNumBytes::ToString(space_info.free_bytes) << " free)"; +} + +TEST_F(TestEnv, TestChangeDir) { + string orig_dir; + ASSERT_OK(env_->GetCurrentWorkingDir(&orig_dir)); + + string cwd; + ASSERT_OK(env_->ChangeDir("/")); + ASSERT_OK(env_->GetCurrentWorkingDir(&cwd)); + ASSERT_EQ("/", cwd); + + ASSERT_OK(env_->ChangeDir(test_dir_)); + ASSERT_OK(env_->GetCurrentWorkingDir(&cwd)); + ASSERT_EQ(test_dir_, cwd); + + ASSERT_OK(env_->ChangeDir(orig_dir)); + ASSERT_OK(env_->GetCurrentWorkingDir(&cwd)); + ASSERT_EQ(orig_dir, cwd); +} + +TEST_F(TestEnv, TestGetExtentMap) { + // In order to force filesystems that use delayed allocation to write out the + // extents, we must Sync() after the file is done growing, and that should + // trigger a real fsync() to the filesystem. + FLAGS_never_fsync = false; + + const string kTestFilePath = GetTestPath("foo"); + const int kFileSizeBytes = 1024*1024; + + // Create a test file of a particular size. + unique_ptr<RWFile> f; + ASSERT_OK(env_->NewRWFile(kTestFilePath, &f)); + ASSERT_OK(f->PreAllocate(0, kFileSizeBytes, RWFile::CHANGE_FILE_SIZE)); + ASSERT_OK(f->Sync()); + + // The number and distribution of extents differs depending on the + // filesystem; this just provides coverage of the code path. + RWFile::ExtentMap extents; + Status s = f->GetExtentMap(&extents); + if (s.IsNotSupported()) { + LOG(INFO) << "GetExtentMap() not supported, skipping test"; + return; + } + ASSERT_OK(s); + SCOPED_TRACE(extents); + int num_extents = extents.size(); + ASSERT_GT(num_extents, 0) << + "There should have been at least one extent in the file"; + + uint64_t fs_block_size; + ASSERT_OK(env_->GetBlockSize(kTestFilePath, &fs_block_size)); + + // Look for an extent to punch. We want an extent that's at least three times + // the block size so that we can punch out the "middle" fs block and thus + // split the extent in half. + uint64_t found_offset = 0; + for (const auto& e : extents) { + if (e.second >= (fs_block_size * 3)) { + found_offset = e.first + fs_block_size; + break; + } + } + ASSERT_GT(found_offset, 0) << "Couldn't find extent to split"; + + // Punch out a hole and split the extent. + s = f->PunchHole(found_offset, fs_block_size); + if (s.IsNotSupported()) { + LOG(INFO) << "PunchHole() not supported, skipping this part of the test"; + return; + } + ASSERT_OK(s); + ASSERT_OK(f->Sync()); + + // Test the extent map; there should be one more extent. + ASSERT_OK(f->GetExtentMap(&extents)); + ASSERT_EQ(num_extents + 1, extents.size()) << + "Punching a hole should have increased the number of extents by one"; +} + +} // namespace kudu http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/d6abb29d/be/src/kudu/util/env.cc ---------------------------------------------------------------------- diff --git a/be/src/kudu/util/env.cc b/be/src/kudu/util/env.cc new file mode 100644 index 0000000..1f6478c --- /dev/null +++ b/be/src/kudu/util/env.cc @@ -0,0 +1,90 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "kudu/util/env.h" + +#include <memory> + +#include "kudu/util/faststring.h" + +using std::unique_ptr; + +namespace kudu { + +Env::~Env() { +} + +SequentialFile::~SequentialFile() { +} + +RandomAccessFile::~RandomAccessFile() { +} + +WritableFile::~WritableFile() { +} + +RWFile::~RWFile() { +} + +FileLock::~FileLock() { +} + +static Status DoWriteStringToFile(Env* env, const Slice& data, + const std::string& fname, + bool should_sync) { + unique_ptr<WritableFile> file; + Status s = env->NewWritableFile(fname, &file); + if (!s.ok()) { + return s; + } + s = file->Append(data); + if (s.ok() && should_sync) { + s = file->Sync(); + } + if (s.ok()) { + s = file->Close(); + } + file.reset(); // Will auto-close if we did not close above + if (!s.ok()) { + WARN_NOT_OK(env->DeleteFile(fname), + "Failed to delete partially-written file " + fname); + } + return s; +} + +// TODO: move these utils into env_util +Status WriteStringToFile(Env* env, const Slice& data, + const std::string& fname) { + return DoWriteStringToFile(env, data, fname, false); +} + +Status WriteStringToFileSync(Env* env, const Slice& data, + const std::string& fname) { + return DoWriteStringToFile(env, data, fname, true); +} + +Status ReadFileToString(Env* env, const std::string& fname, faststring* data) { + data->clear(); + unique_ptr<SequentialFile> file; + Status s = env->NewSequentialFile(fname, &file); + if (!s.ok()) { + return s; + } + static const int kBufferSize = 8192; + unique_ptr<uint8_t[]> scratch(new uint8_t[kBufferSize]); + while (true) { + Slice fragment(scratch.get(), kBufferSize); + s = file->Read(&fragment); + if (!s.ok()) { + break; + } + data->append(fragment.data(), fragment.size()); + if (fragment.empty()) { + break; + } + } + return s; +} + +} // namespace kudu
