This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 7d5f5f6caa58c70cddc79f22ca0bf816220e1656 Author: weijie.tong <[email protected]> AuthorDate: Thu Feb 27 19:34:43 2020 +0800 DRILL-7607: support dynamic credit based flow control closes #2000 --- .../native/client/src/protobuf/BitControl.pb.cc | 107 ++-- contrib/native/client/src/protobuf/BitControl.pb.h | 34 ++ contrib/native/client/src/protobuf/BitData.pb.cc | 278 +++++++++- contrib/native/client/src/protobuf/BitData.pb.h | 155 +++++- .../java/org/apache/drill/exec/ExecConstants.java | 8 +- .../drill/exec/ops/AccountingDataTunnel.java | 6 +- .../drill/exec/ops/DataTunnelStatusHandler.java | 65 +++ .../apache/drill/exec/ops/FragmentContextImpl.java | 4 +- .../exec/planner/fragment/SimpleParallelizer.java | 17 +- .../fragment/contrib/SplittingParallelizer.java | 5 +- .../apache/drill/exec/record/RawFragmentBatch.java | 6 + .../apache/drill/exec/rpc/DynamicSemaphore.java | 84 +++ .../org/apache/drill/exec/rpc/data/AckSender.java | 25 +- .../exec/rpc/data/DataDefaultInstanceHandler.java | 4 +- .../apache/drill/exec/rpc/data/DataRpcConfig.java | 6 +- .../exec/rpc/data/DataServerRequestHandler.java | 6 +- .../org/apache/drill/exec/rpc/data/DataTunnel.java | 39 +- .../exec/server/options/SystemOptionManager.java | 3 +- .../exec/work/batch/AbstractDataCollector.java | 5 +- .../drill/exec/work/batch/BaseRawBatchBuffer.java | 4 +- .../exec/work/batch/SpoolingRawBatchBuffer.java | 16 +- .../exec/work/batch/UnlimitedRawBatchBuffer.java | 59 ++- .../drill/exec/work/filter/RuntimeFilterSink.java | 6 +- .../java-exec/src/main/resources/drill-module.conf | 4 + .../drill/exec/rpc/data/TestBitBitKerberos.java | 6 +- .../org/apache/drill/exec/rpc/data/TestBitRpc.java | 111 +++- .../main/java/org/apache/drill/exec/rpc/Acks.java | 5 + .../org/apache/drill/exec/proto/BitControl.java | 133 ++++- .../java/org/apache/drill/exec/proto/BitData.java | 576 ++++++++++++++++++++- .../apache/drill/exec/proto/SchemaBitControl.java | 7 + .../org/apache/drill/exec/proto/SchemaBitData.java | 111 ++++ protocol/src/main/protobuf/BitControl.proto | 1 + protocol/src/main/protobuf/BitData.proto | 6 + 33 files changed, 1769 insertions(+), 133 deletions(-) diff --git a/contrib/native/client/src/protobuf/BitControl.pb.cc b/contrib/native/client/src/protobuf/BitControl.pb.cc index 3bf9db5..a574273 100644 --- a/contrib/native/client/src/protobuf/BitControl.pb.cc +++ b/contrib/native/client/src/protobuf/BitControl.pb.cc @@ -350,10 +350,12 @@ const ::google::protobuf::uint32 TableStruct::offsets[] GOOGLE_PROTOBUF_ATTRIBUT GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::bit::control::Collector, incoming_minor_fragment_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::bit::control::Collector, supports_out_of_order_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::bit::control::Collector, is_spooling_), + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::bit::control::Collector, enable_dynamic_fc_), 0, ~0u, 1, 2, + 3, GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::bit::control::QueryContextInformation, _has_bits_), GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::bit::control::QueryContextInformation, _internal_metadata_), ~0u, // no _extensions_ @@ -395,10 +397,10 @@ static const ::google::protobuf::internal::MigrationSchema schemas[] GOOGLE_PROT { 29, 35, sizeof(::exec::bit::control::InitializeFragments)}, { 36, 43, sizeof(::exec::bit::control::CustomMessage)}, { 45, 65, sizeof(::exec::bit::control::PlanFragment)}, - { 80, 89, sizeof(::exec::bit::control::Collector)}, - { 93, 102, sizeof(::exec::bit::control::QueryContextInformation)}, - { 106, 114, sizeof(::exec::bit::control::WorkQueueStatus)}, - { 117, 124, sizeof(::exec::bit::control::FinishedReceiver)}, + { 80, 90, sizeof(::exec::bit::control::Collector)}, + { 95, 104, sizeof(::exec::bit::control::QueryContextInformation)}, + { 108, 116, sizeof(::exec::bit::control::WorkQueueStatus)}, + { 119, 126, sizeof(::exec::bit::control::FinishedReceiver)}, }; static ::google::protobuf::Message const * const file_default_instances[] = { @@ -462,32 +464,33 @@ void AddDescriptorsImpl() { "ls\022\024\n\014options_json\030\017 \001(\t\022:\n\007context\030\020 \001(" "\0132).exec.bit.control.QueryContextInforma" "tion\022.\n\tcollector\030\021 \003(\0132\033.exec.bit.contr" - "ol.Collector\"\210\001\n\tCollector\022\"\n\032opposite_m" + "ol.Collector\"\243\001\n\tCollector\022\"\n\032opposite_m" "ajor_fragment_id\030\001 \001(\005\022#\n\027incoming_minor" "_fragment\030\002 \003(\005B\002\020\001\022\035\n\025supports_out_of_o" - "rder\030\003 \001(\010\022\023\n\013is_spooling\030\004 \001(\010\"w\n\027Query" - "ContextInformation\022\030\n\020query_start_time\030\001" - " \001(\003\022\021\n\ttime_zone\030\002 \001(\005\022\033\n\023default_schem" - "a_name\030\003 \001(\t\022\022\n\nsession_id\030\004 \001(\t\"f\n\017Work" - "QueueStatus\022(\n\010endpoint\030\001 \001(\0132\026.exec.Dri" - "llbitEndpoint\022\024\n\014queue_length\030\002 \001(\005\022\023\n\013r" - "eport_time\030\003 \001(\003\"h\n\020FinishedReceiver\022*\n\010" - "receiver\030\001 \001(\0132\030.exec.bit.FragmentHandle" - "\022(\n\006sender\030\002 \001(\0132\030.exec.bit.FragmentHand" - "le*\206\003\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013" - "\n\007GOODBYE\020\002\022\034\n\030REQ_INITIALIZE_FRAGMENTS\020" - "\003\022\027\n\023REQ_CANCEL_FRAGMENT\020\006\022\031\n\025REQ_RECEIV" - "ER_FINISHED\020\007\022\027\n\023REQ_FRAGMENT_STATUS\020\010\022\022" - "\n\016REQ_BIT_STATUS\020\t\022\024\n\020REQ_QUERY_STATUS\020\n" - "\022\024\n\020REQ_QUERY_CANCEL\020\017\022\030\n\024REQ_UNPAUSE_FR" - "AGMENT\020\020\022\016\n\nREQ_CUSTOM\020\021\022\030\n\024RESP_FRAGMEN" - "T_HANDLE\020\013\022\030\n\024RESP_FRAGMENT_STATUS\020\014\022\023\n\017" - "RESP_BIT_STATUS\020\r\022\025\n\021RESP_QUERY_STATUS\020\016" - "\022\017\n\013RESP_CUSTOM\020\022\022\020\n\014SASL_MESSAGE\020\023B+\n\033o" - "rg.apache.drill.exec.protoB\nBitControlH\001" + "rder\030\003 \001(\010\022\023\n\013is_spooling\030\004 \001(\010\022\031\n\021enabl" + "e_dynamic_fc\030\005 \001(\010\"w\n\027QueryContextInform" + "ation\022\030\n\020query_start_time\030\001 \001(\003\022\021\n\ttime_" + "zone\030\002 \001(\005\022\033\n\023default_schema_name\030\003 \001(\t\022" + "\022\n\nsession_id\030\004 \001(\t\"f\n\017WorkQueueStatus\022(" + "\n\010endpoint\030\001 \001(\0132\026.exec.DrillbitEndpoint" + "\022\024\n\014queue_length\030\002 \001(\005\022\023\n\013report_time\030\003 " + "\001(\003\"h\n\020FinishedReceiver\022*\n\010receiver\030\001 \001(" + "\0132\030.exec.bit.FragmentHandle\022(\n\006sender\030\002 " + "\001(\0132\030.exec.bit.FragmentHandle*\206\003\n\007RpcTyp" + "e\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\034" + "\n\030REQ_INITIALIZE_FRAGMENTS\020\003\022\027\n\023REQ_CANC" + "EL_FRAGMENT\020\006\022\031\n\025REQ_RECEIVER_FINISHED\020\007" + "\022\027\n\023REQ_FRAGMENT_STATUS\020\010\022\022\n\016REQ_BIT_STA" + "TUS\020\t\022\024\n\020REQ_QUERY_STATUS\020\n\022\024\n\020REQ_QUERY" + "_CANCEL\020\017\022\030\n\024REQ_UNPAUSE_FRAGMENT\020\020\022\016\n\nR" + "EQ_CUSTOM\020\021\022\030\n\024RESP_FRAGMENT_HANDLE\020\013\022\030\n" + "\024RESP_FRAGMENT_STATUS\020\014\022\023\n\017RESP_BIT_STAT" + "US\020\r\022\025\n\021RESP_QUERY_STATUS\020\016\022\017\n\013RESP_CUST" + "OM\020\022\022\020\n\014SASL_MESSAGE\020\023B+\n\033org.apache.dri" + "ll.exec.protoB\nBitControlH\001" }; ::google::protobuf::DescriptorPool::InternalAddGeneratedFile( - descriptor, 2000); + descriptor, 2027); ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile( "BitControl.proto", &protobuf_RegisterTypes); ::protobuf_ExecutionProtos_2eproto::AddDescriptors(); @@ -2878,6 +2881,7 @@ const int Collector::kOppositeMajorFragmentIdFieldNumber; const int Collector::kIncomingMinorFragmentFieldNumber; const int Collector::kSupportsOutOfOrderFieldNumber; const int Collector::kIsSpoolingFieldNumber; +const int Collector::kEnableDynamicFcFieldNumber; #endif // !defined(_MSC_VER) || _MSC_VER >= 1900 Collector::Collector() @@ -2894,15 +2898,15 @@ Collector::Collector(const Collector& from) incoming_minor_fragment_(from.incoming_minor_fragment_) { _internal_metadata_.MergeFrom(from._internal_metadata_); ::memcpy(&opposite_major_fragment_id_, &from.opposite_major_fragment_id_, - static_cast<size_t>(reinterpret_cast<char*>(&is_spooling_) - - reinterpret_cast<char*>(&opposite_major_fragment_id_)) + sizeof(is_spooling_)); + static_cast<size_t>(reinterpret_cast<char*>(&enable_dynamic_fc_) - + reinterpret_cast<char*>(&opposite_major_fragment_id_)) + sizeof(enable_dynamic_fc_)); // @@protoc_insertion_point(copy_constructor:exec.bit.control.Collector) } void Collector::SharedCtor() { ::memset(&opposite_major_fragment_id_, 0, static_cast<size_t>( - reinterpret_cast<char*>(&is_spooling_) - - reinterpret_cast<char*>(&opposite_major_fragment_id_)) + sizeof(is_spooling_)); + reinterpret_cast<char*>(&enable_dynamic_fc_) - + reinterpret_cast<char*>(&opposite_major_fragment_id_)) + sizeof(enable_dynamic_fc_)); } Collector::~Collector() { @@ -2935,10 +2939,10 @@ void Collector::Clear() { incoming_minor_fragment_.Clear(); cached_has_bits = _has_bits_[0]; - if (cached_has_bits & 7u) { + if (cached_has_bits & 15u) { ::memset(&opposite_major_fragment_id_, 0, static_cast<size_t>( - reinterpret_cast<char*>(&is_spooling_) - - reinterpret_cast<char*>(&opposite_major_fragment_id_)) + sizeof(is_spooling_)); + reinterpret_cast<char*>(&enable_dynamic_fc_) - + reinterpret_cast<char*>(&opposite_major_fragment_id_)) + sizeof(enable_dynamic_fc_)); } _has_bits_.Clear(); _internal_metadata_.Clear(); @@ -3015,6 +3019,20 @@ bool Collector::MergePartialFromCodedStream( break; } + // optional bool enable_dynamic_fc = 5; + case 5: { + if (static_cast< ::google::protobuf::uint8>(tag) == + static_cast< ::google::protobuf::uint8>(40u /* 40 & 0xFF */)) { + set_has_enable_dynamic_fc(); + DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< + bool, ::google::protobuf::internal::WireFormatLite::TYPE_BOOL>( + input, &enable_dynamic_fc_))); + } else { + goto handle_unusual; + } + break; + } + default: { handle_unusual: if (tag == 0) { @@ -3068,6 +3086,11 @@ void Collector::SerializeWithCachedSizes( ::google::protobuf::internal::WireFormatLite::WriteBool(4, this->is_spooling(), output); } + // optional bool enable_dynamic_fc = 5; + if (cached_has_bits & 0x00000008u) { + ::google::protobuf::internal::WireFormatLite::WriteBool(5, this->enable_dynamic_fc(), output); + } + if (_internal_metadata_.have_unknown_fields()) { ::google::protobuf::internal::WireFormat::SerializeUnknownFields( _internal_metadata_.unknown_fields(), output); @@ -3111,6 +3134,11 @@ void Collector::SerializeWithCachedSizes( target = ::google::protobuf::internal::WireFormatLite::WriteBoolToArray(4, this->is_spooling(), target); } + // optional bool enable_dynamic_fc = 5; + if (cached_has_bits & 0x00000008u) { + target = ::google::protobuf::internal::WireFormatLite::WriteBoolToArray(5, this->enable_dynamic_fc(), target); + } + if (_internal_metadata_.have_unknown_fields()) { target = ::google::protobuf::internal::WireFormat::SerializeUnknownFieldsToArray( _internal_metadata_.unknown_fields(), target); @@ -3144,7 +3172,7 @@ size_t Collector::ByteSizeLong() const { total_size += data_size; } - if (_has_bits_[0 / 32] & 7u) { + if (_has_bits_[0 / 32] & 15u) { // optional int32 opposite_major_fragment_id = 1; if (has_opposite_major_fragment_id()) { total_size += 1 + @@ -3162,6 +3190,11 @@ size_t Collector::ByteSizeLong() const { total_size += 1 + 1; } + // optional bool enable_dynamic_fc = 5; + if (has_enable_dynamic_fc()) { + total_size += 1 + 1; + } + } int cached_size = ::google::protobuf::internal::ToCachedSize(total_size); SetCachedSize(cached_size); @@ -3192,7 +3225,7 @@ void Collector::MergeFrom(const Collector& from) { incoming_minor_fragment_.MergeFrom(from.incoming_minor_fragment_); cached_has_bits = from._has_bits_[0]; - if (cached_has_bits & 7u) { + if (cached_has_bits & 15u) { if (cached_has_bits & 0x00000001u) { opposite_major_fragment_id_ = from.opposite_major_fragment_id_; } @@ -3202,6 +3235,9 @@ void Collector::MergeFrom(const Collector& from) { if (cached_has_bits & 0x00000004u) { is_spooling_ = from.is_spooling_; } + if (cached_has_bits & 0x00000008u) { + enable_dynamic_fc_ = from.enable_dynamic_fc_; + } _has_bits_[0] |= cached_has_bits; } } @@ -3234,6 +3270,7 @@ void Collector::InternalSwap(Collector* other) { swap(opposite_major_fragment_id_, other->opposite_major_fragment_id_); swap(supports_out_of_order_, other->supports_out_of_order_); swap(is_spooling_, other->is_spooling_); + swap(enable_dynamic_fc_, other->enable_dynamic_fc_); swap(_has_bits_[0], other->_has_bits_[0]); _internal_metadata_.Swap(&other->_internal_metadata_); } diff --git a/contrib/native/client/src/protobuf/BitControl.pb.h b/contrib/native/client/src/protobuf/BitControl.pb.h index abfda10..8ed08df 100644 --- a/contrib/native/client/src/protobuf/BitControl.pb.h +++ b/contrib/native/client/src/protobuf/BitControl.pb.h @@ -1227,6 +1227,13 @@ class Collector : public ::google::protobuf::Message /* @@protoc_insertion_point bool is_spooling() const; void set_is_spooling(bool value); + // optional bool enable_dynamic_fc = 5; + bool has_enable_dynamic_fc() const; + void clear_enable_dynamic_fc(); + static const int kEnableDynamicFcFieldNumber = 5; + bool enable_dynamic_fc() const; + void set_enable_dynamic_fc(bool value); + // @@protoc_insertion_point(class_scope:exec.bit.control.Collector) private: void set_has_opposite_major_fragment_id(); @@ -1235,6 +1242,8 @@ class Collector : public ::google::protobuf::Message /* @@protoc_insertion_point void clear_has_supports_out_of_order(); void set_has_is_spooling(); void clear_has_is_spooling(); + void set_has_enable_dynamic_fc(); + void clear_has_enable_dynamic_fc(); ::google::protobuf::internal::InternalMetadataWithArena _internal_metadata_; ::google::protobuf::internal::HasBits<1> _has_bits_; @@ -1244,6 +1253,7 @@ class Collector : public ::google::protobuf::Message /* @@protoc_insertion_point ::google::protobuf::int32 opposite_major_fragment_id_; bool supports_out_of_order_; bool is_spooling_; + bool enable_dynamic_fc_; friend struct ::protobuf_BitControl_2eproto::TableStruct; }; // ------------------------------------------------------------------- @@ -2850,6 +2860,30 @@ inline void Collector::set_is_spooling(bool value) { // @@protoc_insertion_point(field_set:exec.bit.control.Collector.is_spooling) } +// optional bool enable_dynamic_fc = 5; +inline bool Collector::has_enable_dynamic_fc() const { + return (_has_bits_[0] & 0x00000008u) != 0; +} +inline void Collector::set_has_enable_dynamic_fc() { + _has_bits_[0] |= 0x00000008u; +} +inline void Collector::clear_has_enable_dynamic_fc() { + _has_bits_[0] &= ~0x00000008u; +} +inline void Collector::clear_enable_dynamic_fc() { + enable_dynamic_fc_ = false; + clear_has_enable_dynamic_fc(); +} +inline bool Collector::enable_dynamic_fc() const { + // @@protoc_insertion_point(field_get:exec.bit.control.Collector.enable_dynamic_fc) + return enable_dynamic_fc_; +} +inline void Collector::set_enable_dynamic_fc(bool value) { + set_has_enable_dynamic_fc(); + enable_dynamic_fc_ = value; + // @@protoc_insertion_point(field_set:exec.bit.control.Collector.enable_dynamic_fc) +} + // ------------------------------------------------------------------- // QueryContextInformation diff --git a/contrib/native/client/src/protobuf/BitData.pb.cc b/contrib/native/client/src/protobuf/BitData.pb.cc index 2a749a0..58d786a 100644 --- a/contrib/native/client/src/protobuf/BitData.pb.cc +++ b/contrib/native/client/src/protobuf/BitData.pb.cc @@ -46,6 +46,11 @@ class RuntimeFilterBDefDefaultTypeInternal { ::google::protobuf::internal::ExplicitlyConstructed<RuntimeFilterBDef> _instance; } _RuntimeFilterBDef_default_instance_; +class AckWithCreditDefaultTypeInternal { + public: + ::google::protobuf::internal::ExplicitlyConstructed<AckWithCredit> + _instance; +} _AckWithCredit_default_instance_; } // namespace data } // namespace bit } // namespace exec @@ -109,14 +114,29 @@ static void InitDefaultsRuntimeFilterBDef() { {{ATOMIC_VAR_INIT(::google::protobuf::internal::SCCInfoBase::kUninitialized), 1, InitDefaultsRuntimeFilterBDef}, { &protobuf_UserBitShared_2eproto::scc_info_QueryId.base,}}; +static void InitDefaultsAckWithCredit() { + GOOGLE_PROTOBUF_VERIFY_VERSION; + + { + void* ptr = &::exec::bit::data::_AckWithCredit_default_instance_; + new (ptr) ::exec::bit::data::AckWithCredit(); + ::google::protobuf::internal::OnShutdownDestroyMessage(ptr); + } + ::exec::bit::data::AckWithCredit::InitAsDefaultInstance(); +} + +::google::protobuf::internal::SCCInfo<0> scc_info_AckWithCredit = + {{ATOMIC_VAR_INIT(::google::protobuf::internal::SCCInfoBase::kUninitialized), 0, InitDefaultsAckWithCredit}, {}}; + void InitDefaults() { ::google::protobuf::internal::InitSCC(&scc_info_BitClientHandshake.base); ::google::protobuf::internal::InitSCC(&scc_info_BitServerHandshake.base); ::google::protobuf::internal::InitSCC(&scc_info_FragmentRecordBatch.base); ::google::protobuf::internal::InitSCC(&scc_info_RuntimeFilterBDef.base); + ::google::protobuf::internal::InitSCC(&scc_info_AckWithCredit.base); } -::google::protobuf::Metadata file_level_metadata[4]; +::google::protobuf::Metadata file_level_metadata[5]; const ::google::protobuf::EnumDescriptor* file_level_enum_descriptors[1]; const ::google::protobuf::uint32 TableStruct::offsets[] GOOGLE_PROTOBUF_ATTRIBUTE_SECTION_VARIABLE(protodesc_cold) = { @@ -178,12 +198,20 @@ const ::google::protobuf::uint32 TableStruct::offsets[] GOOGLE_PROTOBUF_ATTRIBUT ~0u, 4, 5, + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::bit::data::AckWithCredit, _has_bits_), + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::bit::data::AckWithCredit, _internal_metadata_), + ~0u, // no _extensions_ + ~0u, // no _oneof_case_ + ~0u, // no _weak_field_map_ + GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(::exec::bit::data::AckWithCredit, allowed_credit_), + 0, }; static const ::google::protobuf::internal::MigrationSchema schemas[] GOOGLE_PROTOBUF_ATTRIBUTE_SECTION_VARIABLE(protodesc_cold) = { { 0, 7, sizeof(::exec::bit::data::BitClientHandshake)}, { 9, 16, sizeof(::exec::bit::data::BitServerHandshake)}, { 18, 30, sizeof(::exec::bit::data::FragmentRecordBatch)}, { 37, 50, sizeof(::exec::bit::data::RuntimeFilterBDef)}, + { 58, 64, sizeof(::exec::bit::data::AckWithCredit)}, }; static ::google::protobuf::Message const * const file_default_instances[] = { @@ -191,6 +219,7 @@ static ::google::protobuf::Message const * const file_default_instances[] = { reinterpret_cast<const ::google::protobuf::Message*>(&::exec::bit::data::_BitServerHandshake_default_instance_), reinterpret_cast<const ::google::protobuf::Message*>(&::exec::bit::data::_FragmentRecordBatch_default_instance_), reinterpret_cast<const ::google::protobuf::Message*>(&::exec::bit::data::_RuntimeFilterBDef_default_instance_), + reinterpret_cast<const ::google::protobuf::Message*>(&::exec::bit::data::_AckWithCredit_default_instance_), }; void protobuf_AssignDescriptors() { @@ -208,7 +237,7 @@ void protobuf_AssignDescriptorsOnce() { void protobuf_RegisterTypes(const ::std::string&) GOOGLE_PROTOBUF_ATTRIBUTE_COLD; void protobuf_RegisterTypes(const ::std::string&) { protobuf_AssignDescriptorsOnce(); - ::google::protobuf::internal::RegisterAllTypes(file_level_metadata, 4); + ::google::protobuf::internal::RegisterAllTypes(file_level_metadata, 5); } void AddDescriptorsImpl() { @@ -233,14 +262,15 @@ void AddDescriptorsImpl() { "ment_id\030\003 \001(\005\022\022\n\nto_foreman\030\004 \001(\010\022\"\n\032blo" "om_filter_size_in_bytes\030\005 \003(\005\022\024\n\014probe_f" "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005\022\025\n\rrf_iden" - "tifier\030\010 \001(\003*n\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007" - "\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATCH" - "\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FILT" - "ER\020\005B(\n\033org.apache.drill.exec.protoB\007Bit" - "DataH\001" + "tifier\030\010 \001(\003\"\'\n\rAckWithCredit\022\026\n\016allowed" + "_credit\030\001 \001(\005*\210\001\n\007RpcType\022\r\n\tHANDSHAKE\020\000" + "\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BAT" + "CH\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FI" + "LTER\020\005\022\030\n\024DATA_ACK_WITH_CREDIT\020\006B(\n\033org." + "apache.drill.exec.protoB\007BitDataH\001" }; ::google::protobuf::DescriptorPool::InternalAddGeneratedFile( - descriptor, 926); + descriptor, 994); ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile( "BitData.proto", &protobuf_RegisterTypes); ::protobuf_ExecutionProtos_2eproto::AddDescriptors(); @@ -274,6 +304,7 @@ bool RpcType_IsValid(int value) { case 3: case 4: case 5: + case 6: return true; default: return false; @@ -1874,6 +1905,234 @@ void RuntimeFilterBDef::InternalSwap(RuntimeFilterBDef* other) { } +// =================================================================== + +void AckWithCredit::InitAsDefaultInstance() { +} +#if !defined(_MSC_VER) || _MSC_VER >= 1900 +const int AckWithCredit::kAllowedCreditFieldNumber; +#endif // !defined(_MSC_VER) || _MSC_VER >= 1900 + +AckWithCredit::AckWithCredit() + : ::google::protobuf::Message(), _internal_metadata_(NULL) { + ::google::protobuf::internal::InitSCC( + &protobuf_BitData_2eproto::scc_info_AckWithCredit.base); + SharedCtor(); + // @@protoc_insertion_point(constructor:exec.bit.data.AckWithCredit) +} +AckWithCredit::AckWithCredit(const AckWithCredit& from) + : ::google::protobuf::Message(), + _internal_metadata_(NULL), + _has_bits_(from._has_bits_) { + _internal_metadata_.MergeFrom(from._internal_metadata_); + allowed_credit_ = from.allowed_credit_; + // @@protoc_insertion_point(copy_constructor:exec.bit.data.AckWithCredit) +} + +void AckWithCredit::SharedCtor() { + allowed_credit_ = 0; +} + +AckWithCredit::~AckWithCredit() { + // @@protoc_insertion_point(destructor:exec.bit.data.AckWithCredit) + SharedDtor(); +} + +void AckWithCredit::SharedDtor() { +} + +void AckWithCredit::SetCachedSize(int size) const { + _cached_size_.Set(size); +} +const ::google::protobuf::Descriptor* AckWithCredit::descriptor() { + ::protobuf_BitData_2eproto::protobuf_AssignDescriptorsOnce(); + return ::protobuf_BitData_2eproto::file_level_metadata[kIndexInFileMessages].descriptor; +} + +const AckWithCredit& AckWithCredit::default_instance() { + ::google::protobuf::internal::InitSCC(&protobuf_BitData_2eproto::scc_info_AckWithCredit.base); + return *internal_default_instance(); +} + + +void AckWithCredit::Clear() { +// @@protoc_insertion_point(message_clear_start:exec.bit.data.AckWithCredit) + ::google::protobuf::uint32 cached_has_bits = 0; + // Prevent compiler warnings about cached_has_bits being unused + (void) cached_has_bits; + + allowed_credit_ = 0; + _has_bits_.Clear(); + _internal_metadata_.Clear(); +} + +bool AckWithCredit::MergePartialFromCodedStream( + ::google::protobuf::io::CodedInputStream* input) { +#define DO_(EXPRESSION) if (!GOOGLE_PREDICT_TRUE(EXPRESSION)) goto failure + ::google::protobuf::uint32 tag; + // @@protoc_insertion_point(parse_start:exec.bit.data.AckWithCredit) + for (;;) { + ::std::pair<::google::protobuf::uint32, bool> p = input->ReadTagWithCutoffNoLastTag(127u); + tag = p.first; + if (!p.second) goto handle_unusual; + switch (::google::protobuf::internal::WireFormatLite::GetTagFieldNumber(tag)) { + // optional int32 allowed_credit = 1; + case 1: { + if (static_cast< ::google::protobuf::uint8>(tag) == + static_cast< ::google::protobuf::uint8>(8u /* 8 & 0xFF */)) { + set_has_allowed_credit(); + DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive< + ::google::protobuf::int32, ::google::protobuf::internal::WireFormatLite::TYPE_INT32>( + input, &allowed_credit_))); + } else { + goto handle_unusual; + } + break; + } + + default: { + handle_unusual: + if (tag == 0) { + goto success; + } + DO_(::google::protobuf::internal::WireFormat::SkipField( + input, tag, _internal_metadata_.mutable_unknown_fields())); + break; + } + } + } +success: + // @@protoc_insertion_point(parse_success:exec.bit.data.AckWithCredit) + return true; +failure: + // @@protoc_insertion_point(parse_failure:exec.bit.data.AckWithCredit) + return false; +#undef DO_ +} + +void AckWithCredit::SerializeWithCachedSizes( + ::google::protobuf::io::CodedOutputStream* output) const { + // @@protoc_insertion_point(serialize_start:exec.bit.data.AckWithCredit) + ::google::protobuf::uint32 cached_has_bits = 0; + (void) cached_has_bits; + + cached_has_bits = _has_bits_[0]; + // optional int32 allowed_credit = 1; + if (cached_has_bits & 0x00000001u) { + ::google::protobuf::internal::WireFormatLite::WriteInt32(1, this->allowed_credit(), output); + } + + if (_internal_metadata_.have_unknown_fields()) { + ::google::protobuf::internal::WireFormat::SerializeUnknownFields( + _internal_metadata_.unknown_fields(), output); + } + // @@protoc_insertion_point(serialize_end:exec.bit.data.AckWithCredit) +} + +::google::protobuf::uint8* AckWithCredit::InternalSerializeWithCachedSizesToArray( + bool deterministic, ::google::protobuf::uint8* target) const { + (void)deterministic; // Unused + // @@protoc_insertion_point(serialize_to_array_start:exec.bit.data.AckWithCredit) + ::google::protobuf::uint32 cached_has_bits = 0; + (void) cached_has_bits; + + cached_has_bits = _has_bits_[0]; + // optional int32 allowed_credit = 1; + if (cached_has_bits & 0x00000001u) { + target = ::google::protobuf::internal::WireFormatLite::WriteInt32ToArray(1, this->allowed_credit(), target); + } + + if (_internal_metadata_.have_unknown_fields()) { + target = ::google::protobuf::internal::WireFormat::SerializeUnknownFieldsToArray( + _internal_metadata_.unknown_fields(), target); + } + // @@protoc_insertion_point(serialize_to_array_end:exec.bit.data.AckWithCredit) + return target; +} + +size_t AckWithCredit::ByteSizeLong() const { +// @@protoc_insertion_point(message_byte_size_start:exec.bit.data.AckWithCredit) + size_t total_size = 0; + + if (_internal_metadata_.have_unknown_fields()) { + total_size += + ::google::protobuf::internal::WireFormat::ComputeUnknownFieldsSize( + _internal_metadata_.unknown_fields()); + } + // optional int32 allowed_credit = 1; + if (has_allowed_credit()) { + total_size += 1 + + ::google::protobuf::internal::WireFormatLite::Int32Size( + this->allowed_credit()); + } + + int cached_size = ::google::protobuf::internal::ToCachedSize(total_size); + SetCachedSize(cached_size); + return total_size; +} + +void AckWithCredit::MergeFrom(const ::google::protobuf::Message& from) { +// @@protoc_insertion_point(generalized_merge_from_start:exec.bit.data.AckWithCredit) + GOOGLE_DCHECK_NE(&from, this); + const AckWithCredit* source = + ::google::protobuf::internal::DynamicCastToGenerated<const AckWithCredit>( + &from); + if (source == NULL) { + // @@protoc_insertion_point(generalized_merge_from_cast_fail:exec.bit.data.AckWithCredit) + ::google::protobuf::internal::ReflectionOps::Merge(from, this); + } else { + // @@protoc_insertion_point(generalized_merge_from_cast_success:exec.bit.data.AckWithCredit) + MergeFrom(*source); + } +} + +void AckWithCredit::MergeFrom(const AckWithCredit& from) { +// @@protoc_insertion_point(class_specific_merge_from_start:exec.bit.data.AckWithCredit) + GOOGLE_DCHECK_NE(&from, this); + _internal_metadata_.MergeFrom(from._internal_metadata_); + ::google::protobuf::uint32 cached_has_bits = 0; + (void) cached_has_bits; + + if (from.has_allowed_credit()) { + set_allowed_credit(from.allowed_credit()); + } +} + +void AckWithCredit::CopyFrom(const ::google::protobuf::Message& from) { +// @@protoc_insertion_point(generalized_copy_from_start:exec.bit.data.AckWithCredit) + if (&from == this) return; + Clear(); + MergeFrom(from); +} + +void AckWithCredit::CopyFrom(const AckWithCredit& from) { +// @@protoc_insertion_point(class_specific_copy_from_start:exec.bit.data.AckWithCredit) + if (&from == this) return; + Clear(); + MergeFrom(from); +} + +bool AckWithCredit::IsInitialized() const { + return true; +} + +void AckWithCredit::Swap(AckWithCredit* other) { + if (other == this) return; + InternalSwap(other); +} +void AckWithCredit::InternalSwap(AckWithCredit* other) { + using std::swap; + swap(allowed_credit_, other->allowed_credit_); + swap(_has_bits_[0], other->_has_bits_[0]); + _internal_metadata_.Swap(&other->_internal_metadata_); +} + +::google::protobuf::Metadata AckWithCredit::GetMetadata() const { + protobuf_BitData_2eproto::protobuf_AssignDescriptorsOnce(); + return ::protobuf_BitData_2eproto::file_level_metadata[kIndexInFileMessages]; +} + + // @@protoc_insertion_point(namespace_scope) } // namespace data } // namespace bit @@ -1892,6 +2151,9 @@ template<> GOOGLE_PROTOBUF_ATTRIBUTE_NOINLINE ::exec::bit::data::FragmentRecordB template<> GOOGLE_PROTOBUF_ATTRIBUTE_NOINLINE ::exec::bit::data::RuntimeFilterBDef* Arena::CreateMaybeMessage< ::exec::bit::data::RuntimeFilterBDef >(Arena* arena) { return Arena::CreateInternal< ::exec::bit::data::RuntimeFilterBDef >(arena); } +template<> GOOGLE_PROTOBUF_ATTRIBUTE_NOINLINE ::exec::bit::data::AckWithCredit* Arena::CreateMaybeMessage< ::exec::bit::data::AckWithCredit >(Arena* arena) { + return Arena::CreateInternal< ::exec::bit::data::AckWithCredit >(arena); +} } // namespace protobuf } // namespace google diff --git a/contrib/native/client/src/protobuf/BitData.pb.h b/contrib/native/client/src/protobuf/BitData.pb.h index 805c498..80d3b61 100644 --- a/contrib/native/client/src/protobuf/BitData.pb.h +++ b/contrib/native/client/src/protobuf/BitData.pb.h @@ -42,7 +42,7 @@ namespace protobuf_BitData_2eproto { struct TableStruct { static const ::google::protobuf::internal::ParseTableField entries[]; static const ::google::protobuf::internal::AuxillaryParseTableField aux[]; - static const ::google::protobuf::internal::ParseTable schema[4]; + static const ::google::protobuf::internal::ParseTable schema[5]; static const ::google::protobuf::internal::FieldMetadata field_metadata[]; static const ::google::protobuf::internal::SerializationTable serialization_table[]; static const ::google::protobuf::uint32 offsets[]; @@ -52,6 +52,9 @@ void AddDescriptors(); namespace exec { namespace bit { namespace data { +class AckWithCredit; +class AckWithCreditDefaultTypeInternal; +extern AckWithCreditDefaultTypeInternal _AckWithCredit_default_instance_; class BitClientHandshake; class BitClientHandshakeDefaultTypeInternal; extern BitClientHandshakeDefaultTypeInternal _BitClientHandshake_default_instance_; @@ -69,6 +72,7 @@ extern RuntimeFilterBDefDefaultTypeInternal _RuntimeFilterBDef_default_instance_ } // namespace exec namespace google { namespace protobuf { +template<> ::exec::bit::data::AckWithCredit* Arena::CreateMaybeMessage<::exec::bit::data::AckWithCredit>(Arena*); template<> ::exec::bit::data::BitClientHandshake* Arena::CreateMaybeMessage<::exec::bit::data::BitClientHandshake>(Arena*); template<> ::exec::bit::data::BitServerHandshake* Arena::CreateMaybeMessage<::exec::bit::data::BitServerHandshake>(Arena*); template<> ::exec::bit::data::FragmentRecordBatch* Arena::CreateMaybeMessage<::exec::bit::data::FragmentRecordBatch>(Arena*); @@ -85,11 +89,12 @@ enum RpcType { GOODBYE = 2, REQ_RECORD_BATCH = 3, SASL_MESSAGE = 4, - REQ_RUNTIME_FILTER = 5 + REQ_RUNTIME_FILTER = 5, + DATA_ACK_WITH_CREDIT = 6 }; bool RpcType_IsValid(int value); const RpcType RpcType_MIN = HANDSHAKE; -const RpcType RpcType_MAX = REQ_RUNTIME_FILTER; +const RpcType RpcType_MAX = DATA_ACK_WITH_CREDIT; const int RpcType_ARRAYSIZE = RpcType_MAX + 1; const ::google::protobuf::EnumDescriptor* RpcType_descriptor(); @@ -755,6 +760,120 @@ class RuntimeFilterBDef : public ::google::protobuf::Message /* @@protoc_inserti ::google::protobuf::int64 rf_identifier_; friend struct ::protobuf_BitData_2eproto::TableStruct; }; +// ------------------------------------------------------------------- + +class AckWithCredit : public ::google::protobuf::Message /* @@protoc_insertion_point(class_definition:exec.bit.data.AckWithCredit) */ { + public: + AckWithCredit(); + virtual ~AckWithCredit(); + + AckWithCredit(const AckWithCredit& from); + + inline AckWithCredit& operator=(const AckWithCredit& from) { + CopyFrom(from); + return *this; + } + #if LANG_CXX11 + AckWithCredit(AckWithCredit&& from) noexcept + : AckWithCredit() { + *this = ::std::move(from); + } + + inline AckWithCredit& operator=(AckWithCredit&& from) noexcept { + if (GetArenaNoVirtual() == from.GetArenaNoVirtual()) { + if (this != &from) InternalSwap(&from); + } else { + CopyFrom(from); + } + return *this; + } + #endif + inline const ::google::protobuf::UnknownFieldSet& unknown_fields() const { + return _internal_metadata_.unknown_fields(); + } + inline ::google::protobuf::UnknownFieldSet* mutable_unknown_fields() { + return _internal_metadata_.mutable_unknown_fields(); + } + + static const ::google::protobuf::Descriptor* descriptor(); + static const AckWithCredit& default_instance(); + + static void InitAsDefaultInstance(); // FOR INTERNAL USE ONLY + static inline const AckWithCredit* internal_default_instance() { + return reinterpret_cast<const AckWithCredit*>( + &_AckWithCredit_default_instance_); + } + static constexpr int kIndexInFileMessages = + 4; + + void Swap(AckWithCredit* other); + friend void swap(AckWithCredit& a, AckWithCredit& b) { + a.Swap(&b); + } + + // implements Message ---------------------------------------------- + + inline AckWithCredit* New() const final { + return CreateMaybeMessage<AckWithCredit>(NULL); + } + + AckWithCredit* New(::google::protobuf::Arena* arena) const final { + return CreateMaybeMessage<AckWithCredit>(arena); + } + void CopyFrom(const ::google::protobuf::Message& from) final; + void MergeFrom(const ::google::protobuf::Message& from) final; + void CopyFrom(const AckWithCredit& from); + void MergeFrom(const AckWithCredit& from); + void Clear() final; + bool IsInitialized() const final; + + size_t ByteSizeLong() const final; + bool MergePartialFromCodedStream( + ::google::protobuf::io::CodedInputStream* input) final; + void SerializeWithCachedSizes( + ::google::protobuf::io::CodedOutputStream* output) const final; + ::google::protobuf::uint8* InternalSerializeWithCachedSizesToArray( + bool deterministic, ::google::protobuf::uint8* target) const final; + int GetCachedSize() const final { return _cached_size_.Get(); } + + private: + void SharedCtor(); + void SharedDtor(); + void SetCachedSize(int size) const final; + void InternalSwap(AckWithCredit* other); + private: + inline ::google::protobuf::Arena* GetArenaNoVirtual() const { + return NULL; + } + inline void* MaybeArenaPtr() const { + return NULL; + } + public: + + ::google::protobuf::Metadata GetMetadata() const final; + + // nested types ---------------------------------------------------- + + // accessors ------------------------------------------------------- + + // optional int32 allowed_credit = 1; + bool has_allowed_credit() const; + void clear_allowed_credit(); + static const int kAllowedCreditFieldNumber = 1; + ::google::protobuf::int32 allowed_credit() const; + void set_allowed_credit(::google::protobuf::int32 value); + + // @@protoc_insertion_point(class_scope:exec.bit.data.AckWithCredit) + private: + void set_has_allowed_credit(); + void clear_has_allowed_credit(); + + ::google::protobuf::internal::InternalMetadataWithArena _internal_metadata_; + ::google::protobuf::internal::HasBits<1> _has_bits_; + mutable ::google::protobuf::internal::CachedSize _cached_size_; + ::google::protobuf::int32 allowed_credit_; + friend struct ::protobuf_BitData_2eproto::TableStruct; +}; // =================================================================== @@ -1427,6 +1546,34 @@ inline void RuntimeFilterBDef::set_rf_identifier(::google::protobuf::int64 value // @@protoc_insertion_point(field_set:exec.bit.data.RuntimeFilterBDef.rf_identifier) } +// ------------------------------------------------------------------- + +// AckWithCredit + +// optional int32 allowed_credit = 1; +inline bool AckWithCredit::has_allowed_credit() const { + return (_has_bits_[0] & 0x00000001u) != 0; +} +inline void AckWithCredit::set_has_allowed_credit() { + _has_bits_[0] |= 0x00000001u; +} +inline void AckWithCredit::clear_has_allowed_credit() { + _has_bits_[0] &= ~0x00000001u; +} +inline void AckWithCredit::clear_allowed_credit() { + allowed_credit_ = 0; + clear_has_allowed_credit(); +} +inline ::google::protobuf::int32 AckWithCredit::allowed_credit() const { + // @@protoc_insertion_point(field_get:exec.bit.data.AckWithCredit.allowed_credit) + return allowed_credit_; +} +inline void AckWithCredit::set_allowed_credit(::google::protobuf::int32 value) { + set_has_allowed_credit(); + allowed_credit_ = value; + // @@protoc_insertion_point(field_set:exec.bit.data.AckWithCredit.allowed_credit) +} + #ifdef __GNUC__ #pragma GCC diagnostic pop #endif // __GNUC__ @@ -1436,6 +1583,8 @@ inline void RuntimeFilterBDef::set_rf_identifier(::google::protobuf::int64 value // ------------------------------------------------------------------- +// ------------------------------------------------------------------- + // @@protoc_insertion_point(namespace_scope) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 2619cd4..d0a4718 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -77,6 +77,7 @@ public final class ExecConstants { public static final String INCOMING_BUFFER_SIZE = "drill.exec.buffer.size"; public static final String SPOOLING_BUFFER_DELETE = "drill.exec.buffer.spooling.delete"; public static final String SPOOLING_BUFFER_MEMORY = "drill.exec.buffer.spooling.size"; + public static final String UNLIMITED_BUFFER_MAX_MEMORY_SIZE = "drill.exec.buffer.unlimited_receiver.max_size"; public static final String BATCH_PURGE_THRESHOLD = "drill.exec.sort.purge.threshold"; // Spill boot-time Options common to all spilling operators @@ -1152,7 +1153,6 @@ public final class ExecConstants { public static final BooleanValidator PARQUET_READER_ENABLE_MAP_SUPPORT_VALIDATOR = new BooleanValidator( PARQUET_READER_ENABLE_MAP_SUPPORT, new OptionDescription("Enables Drill Parquet reader to read Parquet MAP type correctly. (Drill 1.17+)")); - // --------------------------------------- // Storage-plugin related config constants // Bootstrap plugin files configuration keys @@ -1166,4 +1166,10 @@ public final class ExecConstants { // Extra private plugin classes, used for testing public static final String PRIVATE_CONNECTORS = "drill.exec.storage.private_connectors"; + + public static final String ENABLE_DYNAMIC_CREDIT_BASED_FC = "exec.enable_dynamic_fc"; + public static final BooleanValidator ENABLE_DYNAMIC_CREDIT_BASED_FC_VALIDATOR = new BooleanValidator( + ENABLE_DYNAMIC_CREDIT_BASED_FC, new OptionDescription("Enable dynamic credit based flow control.This feature allows " + + "the sender to send out its data more rapidly, but you should know that it has a risk to OOM when the system is solving parallel " + + "large queries until we have a more accurate resource manager.")); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java index 484629b..67d78e5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/AccountingDataTunnel.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.ops; -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; +import org.apache.drill.exec.proto.BitData; import org.apache.drill.exec.record.FragmentWritableBatch; import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.data.DataTunnel; @@ -33,9 +33,9 @@ import org.apache.drill.exec.work.filter.RuntimeFilterWritable; public class AccountingDataTunnel { private final DataTunnel tunnel; private final SendingAccountor sendingAccountor; - private final RpcOutcomeListener<Ack> statusHandler; + private final RpcOutcomeListener<BitData.AckWithCredit> statusHandler; - public AccountingDataTunnel(DataTunnel tunnel, SendingAccountor sendingAccountor, RpcOutcomeListener<Ack> statusHandler) { + public AccountingDataTunnel(DataTunnel tunnel, SendingAccountor sendingAccountor, RpcOutcomeListener<BitData.AckWithCredit> statusHandler) { this.tunnel = tunnel; this.sendingAccountor = sendingAccountor; this.statusHandler = statusHandler; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java new file mode 100644 index 0000000..e78cda9 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/DataTunnelStatusHandler.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +import io.netty.buffer.ByteBuf; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.rpc.Acks; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Listener that keeps track of the status of batches sent, and updates the SendingAccountor when status is received + * for each batch + */ +public class DataTunnelStatusHandler implements RpcOutcomeListener<BitData.AckWithCredit> { + private static final Logger logger = LoggerFactory.getLogger(DataTunnelStatusHandler.class); + private final SendingAccountor sendingAccountor; + private final Consumer<RpcException> consumer; + + public DataTunnelStatusHandler(Consumer<RpcException> consumer, SendingAccountor sendingAccountor) { + this.consumer = consumer; + this.sendingAccountor = sendingAccountor; + } + + @Override + public void failed(RpcException ex) { + sendingAccountor.decrement(); + consumer.accept(ex); + } + + @Override + public void success(BitData.AckWithCredit value, ByteBuf buffer) { + sendingAccountor.decrement(); + if (value.getAllowedCredit() != Acks.FAIL_CREDIT) { + return; + } + + logger.error("Data not accepted downstream. Stopping future sends. The receiver has failed to solve the query"); + // if we didn't get ack ok, we'll need to kill the query. + consumer.accept(new RpcException("Data not accepted downstream.")); + } + + @Override + public void interrupted(final InterruptedException e) { + sendingAccountor.decrement(); + consumer.interrupt(e); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java index 653241c..42265e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java @@ -49,6 +49,7 @@ import org.apache.drill.exec.physical.impl.OperatorCreatorRegistry; import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.BitData; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; @@ -153,6 +154,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor }; private final RpcOutcomeListener<Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor); + private final RpcOutcomeListener<BitData.AckWithCredit> dataTunnelStatusHandler = new DataTunnelStatusHandler(exceptionConsumer, sendingAccountor); private final AccountingUserConnection accountingUserConnection; /** Stores constants and their holders by type */ private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache; @@ -477,7 +479,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor public AccountingDataTunnel getDataTunnel(final DrillbitEndpoint endpoint) { AccountingDataTunnel tunnel = tunnels.get(endpoint); if (tunnel == null) { - tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, statusHandler); + tunnel = new AccountingDataTunnel(context.getDataConnectionsPool().getTunnel(endpoint), sendingAccountor, dataTunnelStatusHandler); tunnels.put(endpoint, tunnel); } return tunnel; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index a434bf8..0d3529b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -71,6 +71,7 @@ public abstract class SimpleParallelizer implements QueryParallelizer { private final int maxWidthPerNode; private final int maxGlobalWidth; private final double affinityFactor; + private boolean enableDynamicFC; protected SimpleParallelizer(QueryContext context) { OptionManager optionManager = context.getOptions(); @@ -82,6 +83,7 @@ public abstract class SimpleParallelizer implements QueryParallelizer { this.maxWidthPerNode = ExecConstants.MAX_WIDTH_PER_NODE.computeMaxWidth(cpu_load_average,maxWidth); this.maxGlobalWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_GLOBAL_KEY).num_val.intValue(); this.affinityFactor = optionManager.getOption(ExecConstants.AFFINITY_FACTOR_KEY).float_val.intValue(); + this.enableDynamicFC = optionManager.getBoolean(ExecConstants.ENABLE_DYNAMIC_CREDIT_BASED_FC); } protected SimpleParallelizer(long parallelizationThreshold, int maxWidthPerNode, int maxGlobalWidth, double affinityFactor) { @@ -312,7 +314,6 @@ public abstract class SimpleParallelizer implements QueryParallelizer { } // a fragment is self driven if it doesn't rely on any other exchanges. boolean isLeafFragment = node.getReceivingExchangePairs().size() == 0; - // Create a minorFragment for each major fragment. for (int minorFragmentId = 0; minorFragmentId < wrapper.getWidth(); minorFragmentId++) { IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper, @@ -338,7 +339,7 @@ public abstract class SimpleParallelizer implements QueryParallelizer { .setMemInitial(wrapper.getInitialAllocation()) .setMemMax(wrapper.getMaxAllocation()) .setCredentials(session.getCredentials()) - .addAllCollector(CountRequiredFragments.getCollectors(root)) + .addAllCollector(CountRequiredFragments.getCollectors(root, enableDynamicFC)) .build(); MinorFragmentDefn fragmentDefn = new MinorFragmentDefn(fragment, root, options); @@ -362,11 +363,16 @@ public abstract class SimpleParallelizer implements QueryParallelizer { */ protected static class CountRequiredFragments extends AbstractPhysicalVisitor<Void, List<Collector>, RuntimeException> { - private static final CountRequiredFragments INSTANCE = new CountRequiredFragments(); + private boolean enableDynamicFC; + + CountRequiredFragments(boolean enableDynamicFC) { + this.enableDynamicFC = enableDynamicFC; + } - public static List<Collector> getCollectors(PhysicalOperator root) { + public static List<Collector> getCollectors(PhysicalOperator root, boolean enableDynamicFC) { List<Collector> collectors = Lists.newArrayList(); - root.accept(INSTANCE, collectors); + CountRequiredFragments countRequiredFragments = new CountRequiredFragments(enableDynamicFC); + root.accept(countRequiredFragments, collectors); return collectors; } @@ -382,6 +388,7 @@ public abstract class SimpleParallelizer implements QueryParallelizer { .setIsSpooling(receiver.isSpooling()) .setOppositeMajorFragmentId(receiver.getOppositeMajorFragmentId()) .setSupportsOutOfOrder(receiver.supportsOutOfOrderExchange()) + .setEnableDynamicFc(enableDynamicFC) .addAllIncomingMinorFragment(list) .build()); return null; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java index c1250e3..25d32fc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java @@ -24,6 +24,7 @@ import java.util.Set; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.util.DrillStringUtils; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.base.Exchange; import org.apache.drill.exec.physical.base.FragmentRoot; @@ -60,9 +61,11 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists; public class SplittingParallelizer extends DefaultQueryParallelizer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SplittingParallelizer.class); + private boolean enableDynamicFC; public SplittingParallelizer(boolean doMemoryPlanning, QueryContext context) { super(doMemoryPlanning, context); + this.enableDynamicFC = context.getOptions().getBoolean(ExecConstants.ENABLE_DYNAMIC_CREDIT_BASED_FC); } /** @@ -204,7 +207,7 @@ public class SplittingParallelizer extends DefaultQueryParallelizer { .setMemInitial(initialAllocation)// .setMemMax(wrapper.getMaxAllocation()) // TODO - for some reason OOM is using leaf fragment max allocation divided by width .setCredentials(session.getCredentials()) - .addAllCollector(CountRequiredFragments.getCollectors(root)) + .addAllCollector(CountRequiredFragments.getCollectors(root, enableDynamicFC)) .build(); MinorFragmentDefn fragmentDefn = new MinorFragmentDefn(fragment, root, options); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java index 7ba247a..2bbe566 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java @@ -70,6 +70,12 @@ public class RawFragmentBatch { } } + public synchronized void sendOk(int suggestedCredit) { + if (sender != null && ackSent.compareAndSet(false, true)) { + sender.sendOk(suggestedCredit); + } + } + public long getByteCount() { return body == null ? 0 : body.readableBytes(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DynamicSemaphore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DynamicSemaphore.java new file mode 100644 index 0000000..0847cef --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/DynamicSemaphore.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Dynamic credit based flow control: + * The sender initially sends batch to the sender by the initial static credit (3). + * The receiver will calculate a runtime credit value while sampling some received batches. The runtime + * generated credit value will be sent to the receiver as an ack value. The sender will change to the + * runtime generated credit value while received the ack credit value. + * + * The ack credit value has three type numeric value with different meaning: + * 0 : meaning no explicit credit value to the sender, just keep the initial static credit based fc. + * -1: meaning the receiver has gone to wrong to solve the query. + * value greater than 0: meaning a runtime generated credit value for the sender to change to. + */ +public class DynamicSemaphore { + + private static final int INITIAL_STATIC_CREDIT = 3; + private final Semaphore semaphore = new Semaphore(INITIAL_STATIC_CREDIT); + private final AtomicBoolean changed = new AtomicBoolean(false); + private final ReentrantLock lock = new ReentrantLock(); + private int firstReceivedCredit = -1; + + public DynamicSemaphore() { + + } + + public void acquire() throws InterruptedException { + semaphore.acquire(); + } + + public void release() { + semaphore.release(); + } + + /** + * received an advice credit to transfer from the + * initial static value + * + * @param suggestedSemaphoreVal + */ + public void tryToIncreaseCredit(int suggestedSemaphoreVal) { + if (suggestedSemaphoreVal < INITIAL_STATIC_CREDIT) { + return; + } + if (changed.get()) { + return; + } + try { + lock.lock(); + if (suggestedSemaphoreVal > firstReceivedCredit) { + firstReceivedCredit = suggestedSemaphoreVal; + } else { + return; + } + int increasedSemaphoreNumber = suggestedSemaphoreVal - INITIAL_STATIC_CREDIT; + //meaning increase the semaphore + semaphore.release(increasedSemaphoreNumber); + changed.compareAndSet(false, true); + } finally { + lock.unlock(); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/AckSender.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/AckSender.java index 5be29ee..70251be 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/AckSender.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/AckSender.java @@ -19,6 +19,9 @@ package org.apache.drill.exec.rpc.data; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.rpc.Acks; +import org.apache.drill.exec.rpc.Response; import org.apache.drill.exec.rpc.ResponseSender; import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; @@ -31,6 +34,7 @@ public class AckSender { private AtomicInteger count = new AtomicInteger(0); private ResponseSender sender; + private int everLargestAdviceCredit = Acks.NO_SUGGESTED_CREDIT; @VisibleForTesting public AckSender(ResponseSender sender) { @@ -56,8 +60,27 @@ public class AckSender { * response upstream. */ public void sendOk() { + sendOk(Acks.NO_SUGGESTED_CREDIT); + } + + /** + * Decrement the number of references still holding on to this response. When the number of references hit zero, send + * response upstream. Ack for the dynamic credit model + * credit == -1 means Fail + * @param credit suggested credit value + */ + public void sendOk(int credit) { + everLargestAdviceCredit = Math.max(everLargestAdviceCredit, credit); if (0 == count.decrementAndGet()) { - sender.send(DataRpcConfig.OK); + BitData.AckWithCredit ackWithCredit = BitData.AckWithCredit.newBuilder().setAllowedCredit(everLargestAdviceCredit).build(); + Response ackResponse = new Response(BitData.RpcType.DATA_ACK_WITH_CREDIT, ackWithCredit); + sender.send(ackResponse); } } + + public void sendFail() { + BitData.AckWithCredit ackWithCredit = BitData.AckWithCredit.newBuilder().setAllowedCredit(Acks.FAIL_CREDIT).build(); + Response ackFailResponse = new Response(BitData.RpcType.DATA_ACK_WITH_CREDIT, ackWithCredit); + sender.send(ackFailResponse); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java index 12a575c..76c0acd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc.data; +import org.apache.drill.exec.proto.BitData; import org.apache.drill.exec.proto.BitData.BitClientHandshake; import org.apache.drill.exec.proto.BitData.BitServerHandshake; import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; @@ -38,7 +39,8 @@ public class DataDefaultInstanceHandler { return BitServerHandshake.getDefaultInstance(); case RpcType.SASL_MESSAGE_VALUE: return SaslMessage.getDefaultInstance(); - + case RpcType.DATA_ACK_WITH_CREDIT_VALUE: + return BitData.AckWithCredit.getDefaultInstance(); default: throw new UnsupportedOperationException(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java index 38e8771..1c70526 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java @@ -21,12 +21,12 @@ import java.util.concurrent.Executor; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.proto.BitData; import org.apache.drill.exec.proto.BitData.RuntimeFilterBDef; import org.apache.drill.exec.proto.BitData.BitClientHandshake; import org.apache.drill.exec.proto.BitData.BitServerHandshake; import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; import org.apache.drill.exec.proto.BitData.RpcType; -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.SaslMessage; import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.Response; @@ -41,9 +41,9 @@ public class DataRpcConfig { .executor(executor) .timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT)) .add(RpcType.HANDSHAKE, BitClientHandshake.class, RpcType.HANDSHAKE, BitServerHandshake.class) - .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, Ack.class) + .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.DATA_ACK_WITH_CREDIT, BitData.AckWithCredit.class) .add(RpcType.SASL_MESSAGE, SaslMessage.class, RpcType.SASL_MESSAGE, SaslMessage.class) - .add(RpcType.REQ_RUNTIME_FILTER, RuntimeFilterBDef.class, RpcType.ACK, Ack.class) + .add(RpcType.REQ_RUNTIME_FILTER, RuntimeFilterBDef.class, RpcType.DATA_ACK_WITH_CREDIT, BitData.AckWithCredit.class) .build(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java index 5ad7ba4..896474f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java @@ -25,9 +25,7 @@ import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; import org.apache.drill.exec.proto.BitData.RpcType; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.RequestHandler; -import org.apache.drill.exec.rpc.Response; import org.apache.drill.exec.rpc.ResponseSender; import org.apache.drill.exec.rpc.RpcBus; import org.apache.drill.exec.rpc.RpcException; @@ -94,7 +92,7 @@ class DataServerRequestHandler implements RequestHandler<DataServerConnection> { fragmentBatch.getReceivingMajorFragmentId(), fragmentBatch.getReceivingMinorFragmentIdList()), e); ack.clear(); - sender.send(new Response(BitData.RpcType.ACK, Acks.FAIL)); + ack.sendFail(); } finally { // decrement the extra reference we grabbed at the top. @@ -127,7 +125,7 @@ class DataServerRequestHandler implements RequestHandler<DataServerConnection> { logger.error("error to solve received runtime filter, {}", QueryIdHelper.getQueryId(runtimeFilterBDef.getQueryId()), e); ackSender.clear(); - sender.send(new Response(BitData.RpcType.ACK, Acks.FAIL)); + ackSender.sendFail(); } finally { ackSender.sendOk(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java index 188cfbc..f60d668 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java @@ -19,10 +19,10 @@ package org.apache.drill.exec.rpc.data; import com.google.protobuf.MessageLite; import io.netty.buffer.ByteBuf; -import java.util.concurrent.Semaphore; +import org.apache.drill.exec.proto.BitData; import org.apache.drill.exec.proto.BitData.RpcType; -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.record.FragmentWritableBatch; +import org.apache.drill.exec.rpc.DynamicSemaphore; import org.apache.drill.exec.rpc.ListeningCommand; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; @@ -35,7 +35,7 @@ public class DataTunnel { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataTunnel.class); private final DataConnectionManager manager; - private final Semaphore sendingSemaphore = new Semaphore(3); + private final DynamicSemaphore sendingSemaphore = new DynamicSemaphore(); // Needed for injecting a test pause private boolean isInjectionControlSet; @@ -66,7 +66,7 @@ public class DataTunnel { this.testLogger = testLogger; } - public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentWritableBatch batch) { + public void sendRecordBatch(RpcOutcomeListener<BitData.AckWithCredit> outcomeListener, FragmentWritableBatch batch) { SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch); try { if (isInjectionControlSet) { @@ -91,7 +91,7 @@ public class DataTunnel { } } - public void sendRuntimeFilter(RpcOutcomeListener<Ack> outcomeListener, RuntimeFilterWritable runtimeFilter) { + public void sendRuntimeFilter(RpcOutcomeListener<BitData.AckWithCredit> outcomeListener, RuntimeFilterWritable runtimeFilter) { SendRuntimeFilterAsyncListen cmd = new SendRuntimeFilterAsyncListen(outcomeListener, runtimeFilter); try{ if (isInjectionControlSet) { @@ -111,10 +111,10 @@ public class DataTunnel { } } - private class ThrottlingOutcomeListener implements RpcOutcomeListener<Ack>{ - RpcOutcomeListener<Ack> inner; + private class ThrottlingOutcomeListener implements RpcOutcomeListener<BitData.AckWithCredit>{ + RpcOutcomeListener<BitData.AckWithCredit> inner; - public ThrottlingOutcomeListener(RpcOutcomeListener<Ack> inner) { + public ThrottlingOutcomeListener(RpcOutcomeListener<BitData.AckWithCredit> inner) { super(); this.inner = inner; } @@ -126,7 +126,12 @@ public class DataTunnel { } @Override - public void success(Ack value, ByteBuf buffer) { + public void success(BitData.AckWithCredit value, ByteBuf buffer) { + int credit = value.getAllowedCredit(); + if (credit > 0) { + //received an explicit runtime advice to transfer to the new credit + sendingSemaphore.tryToIncreaseCredit(credit); + } sendingSemaphore.release(); inner.success(value, buffer); } @@ -138,18 +143,18 @@ public class DataTunnel { } } - private class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection, RpcType, MessageLite> { + private class SendBatchAsyncListen extends ListeningCommand<BitData.AckWithCredit, DataClientConnection, RpcType, MessageLite> { final FragmentWritableBatch batch; - public SendBatchAsyncListen(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch) { + public SendBatchAsyncListen(RpcOutcomeListener<BitData.AckWithCredit> listener, FragmentWritableBatch batch) { super(listener); this.batch = batch; } @Override - public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, DataClientConnection connection) { + public void doRpcCall(RpcOutcomeListener<BitData.AckWithCredit> outcomeListener, DataClientConnection connection) { connection.send(new ThrottlingOutcomeListener(outcomeListener), getRpcType(), batch.getHeader(), - Ack.class, batch.getBuffers()); + BitData.AckWithCredit.class, batch.getBuffers()); } @Override @@ -176,17 +181,17 @@ public class DataTunnel { } } - private class SendRuntimeFilterAsyncListen extends ListeningCommand<Ack, DataClientConnection, RpcType, MessageLite> { + private class SendRuntimeFilterAsyncListen extends ListeningCommand<BitData.AckWithCredit, DataClientConnection, RpcType, MessageLite> { final RuntimeFilterWritable runtimeFilter; - public SendRuntimeFilterAsyncListen(RpcOutcomeListener<Ack> listener, RuntimeFilterWritable runtimeFilter) { + public SendRuntimeFilterAsyncListen(RpcOutcomeListener<BitData.AckWithCredit> listener, RuntimeFilterWritable runtimeFilter) { super(listener); this.runtimeFilter = runtimeFilter; } @Override - public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, DataClientConnection connection) { - connection.send(outcomeListener, RpcType.REQ_RUNTIME_FILTER, runtimeFilter.getRuntimeFilterBDef(), Ack.class, runtimeFilter.getData()); + public void doRpcCall(RpcOutcomeListener<BitData.AckWithCredit> outcomeListener, DataClientConnection connection) { + connection.send(outcomeListener, RpcType.REQ_RUNTIME_FILTER, runtimeFilter.getRuntimeFilterBDef(), BitData.AckWithCredit.class, runtimeFilter.getData()); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index f9011c6..5d2598c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -314,7 +314,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.METASTORE_CTAS_AUTO_COLLECT_METADATA_VALIDATOR), new OptionDefinition(ExecConstants.METASTORE_FALLBACK_TO_FILE_METADATA_VALIDATOR), new OptionDefinition(ExecConstants.METASTORE_RETRIEVAL_RETRY_ATTEMPTS_VALIDATOR), - new OptionDefinition(ExecConstants.PARQUET_READER_ENABLE_MAP_SUPPORT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, false, false)) + new OptionDefinition(ExecConstants.PARQUET_READER_ENABLE_MAP_SUPPORT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, false, false)), + new OptionDefinition(ExecConstants.ENABLE_DYNAMIC_CREDIT_BASED_FC_VALIDATOR) }; CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java index 4c0b063..ceb5944 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java @@ -70,12 +70,13 @@ public abstract class AbstractDataCollector implements DataCollector { remainingRequired = new AtomicInteger(numBuffers); final boolean spooling = collector.getIsSpooling(); + final boolean enableDynamicFc = collector.hasEnableDynamicFc(); for (int i = 0; i < numBuffers; i++) { if (spooling) { - buffers[i] = new SpoolingRawBatchBuffer(context, bufferCapacity, collector.getOppositeMajorFragmentId(), i); + buffers[i] = new SpoolingRawBatchBuffer(context, bufferCapacity, collector.getOppositeMajorFragmentId(), i, enableDynamicFc); } else { - buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity); + buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity, enableDynamicFc); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java index db25fd4..440a693 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java @@ -53,13 +53,15 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer { private int streamCounter; private final int fragmentCount; protected final FragmentContext context; + protected final boolean enableDynamicFC; - public BaseRawBatchBuffer(final FragmentContext context, final int fragmentCount) { + public BaseRawBatchBuffer(final FragmentContext context, final int fragmentCount, final boolean enableDynamicFC) { bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE); this.fragmentCount = fragmentCount; this.streamCounter = fragmentCount; this.context = context; + this.enableDynamicFC = enableDynamicFC; } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java index 50f582d..4e7e251 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java @@ -59,6 +59,7 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB private static final float STOP_SPOOLING_FRACTION = (float) 0.5; public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; + private static final int SPOOLING_SENDER_CREDIT = 20; private enum SpoolingState { NOT_SPOOLING, @@ -80,8 +81,8 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB private Path path; private FSDataOutputStream outputStream; - public SpoolingRawBatchBuffer(FragmentContext context, int fragmentCount, int oppositeId, int bufferIndex) { - super(context, fragmentCount); + public SpoolingRawBatchBuffer(FragmentContext context, int fragmentCount, int oppositeId, int bufferIndex, boolean enableDynamicFC) { + super(context, fragmentCount, enableDynamicFC); this.allocator = context.getNewChildAllocator( "SpoolingRawBatchBufer", 100, ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION); this.threshold = context.getConfig().getLong(ExecConstants.SPOOLING_BUFFER_MEMORY); @@ -350,7 +351,12 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB this.available = available; this.latch = new CountDownLatch(available ? 0 : 1); if (available) { - batch.sendOk(); + //As we can flush to disc ,we could let the sender to send the batch more rapidly + if (enableDynamicFC) { + batch.sendOk(SPOOLING_SENDER_CREDIT); + } else { + batch.sendOk(); + } } } @@ -374,8 +380,8 @@ public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchB if (batch.getBody() == null) { return 0; } - assert batch.getBody().readableBytes() >= 0; - return batch.getBody().readableBytes(); + assert batch.getBody().capacity() >= 0; + return batch.getBody().capacity(); } public void writeToStream(FSDataOutputStream stream) throws IOException { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java index 0d36d5d..f6380b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RawFragmentBatch; @@ -31,13 +32,25 @@ public class UnlimitedRawBatchBuffer extends BaseRawBatchBuffer<RawFragmentBatch private final int softlimit; private final int startlimit; - - public UnlimitedRawBatchBuffer(FragmentContext context, int fragmentCount) { - super(context, fragmentCount); + //which controls the receiver total credit when we enable the dynamic credit based flow control. + private int runtimeSoftLimit = -1; + private int runtimeAckCredit = 1; + private int sampleTimes = 0; + private long totalBatchSize = 0L; + private final int fragmentCount; + private final int maxSampleTimes; + private final long thresholdNetworkMem; + + public UnlimitedRawBatchBuffer(FragmentContext context, int fragmentCount, boolean enableDynamicFC) { + super(context, fragmentCount, enableDynamicFC); this.softlimit = bufferSizePerSocket * fragmentCount; this.startlimit = Math.max(softlimit/2, 1); logger.trace("softLimit: {}, startLimit: {}", softlimit, startlimit); this.bufferQueue = new UnlimitedBufferQueue(); + this.fragmentCount = fragmentCount; + this.sampleTimes = fragmentCount; + this.maxSampleTimes = fragmentCount; + this.thresholdNetworkMem = context.getConfig().getLong(ExecConstants.UNLIMITED_BUFFER_MAX_MEMORY_SIZE); } private class UnlimitedBufferQueue implements BufferQueue<RawFragmentBatch> { @@ -90,14 +103,50 @@ public class UnlimitedRawBatchBuffer extends BaseRawBatchBuffer<RawFragmentBatch @Override public void add(RawFragmentBatch batch) { + doFlowControl(batch); buffer.add(batch); } } - protected void enqueueInner(final RawFragmentBatch batch) throws IOException { - if (bufferQueue.size() < softlimit) { + private void doFlowControl(RawFragmentBatch batch) { + if (enableDynamicFC) { + calculateDynamicCredit(batch); + if (runtimeSoftLimit > 0) { + //we already make a decision to give a suggest runtime sender credit + if (bufferQueue.size() < runtimeSoftLimit) { + //we just send the same suggest credit to the sender + batch.sendOk(runtimeAckCredit); + } + } else if (bufferQueue.size() < softlimit) { + //fallback to the initial static credit based flow control + batch.sendOk(); + } + } else if (bufferQueue.size() < softlimit) { + //still use the initial static sender credit batch.sendOk(); } + } + + private void calculateDynamicCredit(RawFragmentBatch batch) { + int recordCount = batch.getHeader().getDef().getRecordCount(); + long batchByteSize = batch.getBody() == null ? 0 : batch.getBody().capacity(); + if (recordCount != 0) { + //skip first header batch + totalBatchSize += batchByteSize; + sampleTimes++; + } + if (sampleTimes == maxSampleTimes) { + long averageBatchSize = totalBatchSize / sampleTimes; + //make a decision + if (averageBatchSize > 0) { + runtimeSoftLimit = (int) (thresholdNetworkMem / averageBatchSize); + runtimeAckCredit = runtimeSoftLimit / fragmentCount; + runtimeAckCredit = Math.max(runtimeAckCredit, 1); + } + } + } + + protected void enqueueInner(final RawFragmentBatch batch) throws IOException { bufferQueue.add(batch); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java index c0eceae..9009484 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java @@ -19,13 +19,13 @@ package org.apache.drill.exec.work.filter; import io.netty.buffer.DrillBuf; import org.apache.drill.common.exceptions.DrillRuntimeException; + import org.apache.drill.exec.ops.AccountingDataTunnel; import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.DataTunnelStatusHandler; import org.apache.drill.exec.ops.SendingAccountor; -import org.apache.drill.exec.ops.StatusHandler; import org.apache.drill.exec.proto.BitData; import org.apache.drill.exec.proto.CoordinationProtos; -import org.apache.drill.exec.proto.GeneralRPCProtos; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; @@ -200,7 +200,7 @@ public class RuntimeFilterSink implements Closeable logger.warn("fail to broadcast a runtime filter to the probe side scan node", e); } }; - RpcOutcomeListener<GeneralRPCProtos.Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor); + RpcOutcomeListener<BitData.AckWithCredit> statusHandler = new DataTunnelStatusHandler(exceptionConsumer, sendingAccountor); AccountingDataTunnel accountingDataTunnel = new AccountingDataTunnel(dataTunnel, sendingAccountor, statusHandler); accountingDataTunnel.sendRuntimeFilter(runtimeFilterWritable); } diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index bdfeb57..5d5eae1 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -267,6 +267,9 @@ drill.exec: { spooling: { delete: true, size: 100000000 + }, + unlimited_receiver: { + max_size: 500000000 } }, compile: { @@ -540,6 +543,7 @@ drill.exec.options: { exec.query_profile.debug_mode: false, exec.query_profile.save: true, exec.query_profile.alter_session.skip: true, + exec.enable_dynamic_fc: false, exec.queue.enable: false, # Default queue values for an 8 GB direct memory default # Drill install. Users are expected to adjust these based diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java index 8ad793d..dcd1cf9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitBitKerberos.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc.data; +import org.apache.drill.exec.proto.BitData; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -41,7 +42,6 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.record.FragmentWritableBatch; @@ -145,7 +145,7 @@ public class TestBitBitKerberos extends BaseTestQuery { return WritableBatch.getBatchNoHV(records, vectors, false); } - private class TimingOutcome implements RpcOutcomeListener<Ack> { + private class TimingOutcome implements RpcOutcomeListener<BitData.AckWithCredit> { private AtomicLong max; private Stopwatch watch = Stopwatch.createStarted(); @@ -160,7 +160,7 @@ public class TestBitBitKerberos extends BaseTestQuery { } @Override - public void success(Ack value, ByteBuf buffer) { + public void success(BitData.AckWithCredit value, ByteBuf buffer) { long micros = watch.elapsed(TimeUnit.MILLISECONDS); while (true) { long nowMax = max.get(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java index 1ef0ad2..20a7340 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/data/TestBitRpc.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc.data; +import org.apache.drill.exec.proto.BitData; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import io.netty.buffer.ByteBuf; @@ -32,7 +33,6 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.FragmentContextImpl; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.record.FragmentWritableBatch; import org.apache.drill.exec.record.MaterializedField; @@ -94,6 +94,39 @@ public class TestBitRpc extends ExecTest { Thread.sleep(5000); } + @Test + public void testConnectionBackpressureWithDynamicCredit() throws Exception { + WorkerBee bee = mock(WorkerBee.class); + WorkEventBus workBus = mock(WorkEventBus.class); + DrillConfig config1 = DrillConfig.create(); + BootStrapContext c = new BootStrapContext(config1, SystemOptionManager.createDefaultOptionDefinitions(), ClassPathScanner.fromPrescan(config1)); + FragmentContextImpl fcon = mock(FragmentContextImpl.class); + when(fcon.getAllocator()).thenReturn(c.getAllocator()); + + final FragmentManager fman = new MockFragmentManagerWithDynamicCredit(c); + + when(workBus.getFragmentManager(any(FragmentHandle.class))).thenReturn(fman); + + int port = 1234; + + DataConnectionConfig config = new DataConnectionConfig(c.getAllocator(), c, + new DataServerRequestHandler(workBus, bee)); + DataServer server = new DataServer(config); + + port = server.bind(port, true); + DrillbitEndpoint ep = DrillbitEndpoint.newBuilder().setAddress("localhost").setDataPort(port).build(); + DataConnectionManager manager = new DataConnectionManager(ep, config); + DataTunnel tunnel = new DataTunnel(manager); + AtomicLong max = new AtomicLong(0); + for (int i = 0; i < 40; i++) { + long t1 = System.currentTimeMillis(); + tunnel.sendRecordBatch(new TimingOutcome(max), new FragmentWritableBatch(false, QueryId.getDefaultInstance(), 1, + 1, 1, 1, getRandomBatch(c.getAllocator(), 5000))); + } + assertTrue(max.get() > 2700); + Thread.sleep(5000); + } + private static WritableBatch getRandomBatch(BufferAllocator allocator, int records) { List<ValueVector> vectors = Lists.newArrayList(); for (int i = 0; i < 5; i++) { @@ -107,7 +140,7 @@ public class TestBitRpc extends ExecTest { return WritableBatch.getBatchNoHV(records, vectors, false); } - private class TimingOutcome implements RpcOutcomeListener<Ack> { + private class TimingOutcome implements RpcOutcomeListener<BitData.AckWithCredit> { private AtomicLong max; private Stopwatch watch = Stopwatch.createStarted(); @@ -122,7 +155,7 @@ public class TestBitRpc extends ExecTest { } @Override - public void success(Ack value, ByteBuf buffer) { + public void success(BitData.AckWithCredit value, ByteBuf buffer) { long micros = watch.elapsed(TimeUnit.MILLISECONDS); while (true) { long nowMax = max.get(); @@ -209,4 +242,76 @@ public class TestBitRpc extends ExecTest { } } + + public static class MockFragmentManagerWithDynamicCredit implements FragmentManager { + private final BootStrapContext c; + private int v; + private int times = 0; + + public MockFragmentManagerWithDynamicCredit(BootStrapContext c) { + this.c = c; + } + + @Override + public boolean handle(IncomingDataBatch batch) throws FragmentSetupException, IOException { + try { + v++; + if (v % 10 == 0) { + Thread.sleep(3000); + } + } catch (InterruptedException e) { + + } + times++; + RawFragmentBatch rfb = batch.newRawFragmentBatch(c.getAllocator()); + if (times > 3) { + rfb.sendOk(4); + } else { + rfb.sendOk(); + } + rfb.release(); + + return true; + } + + @Override + public FragmentExecutor getRunnable() { + return null; + } + + @Override + public void cancel() { + + } + + @Override + public boolean isCancelled() { + return false; + } + + @Override + public void unpause() { + + } + + @Override + public boolean isWaiting() { + return false; + } + + @Override + public FragmentHandle getHandle() { + return null; + } + + @Override + public FragmentContext getFragmentContext() { + return null; + } + + @Override + public void receivingFragmentFinished(FragmentHandle handle) { + + } + } } diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/Acks.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/Acks.java index cd1c972..92549bc 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/Acks.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/Acks.java @@ -24,4 +24,9 @@ public class Acks { public static final Ack OK = Ack.newBuilder().setOk(true).build(); public static final Ack FAIL = Ack.newBuilder().setOk(false).build(); + //-------To dynamic credit value: -1 means the receiver failed to solve, 0 means no explicit credit and the sender keeps its sender credit. + // a value which is great than 0 means having an explicit credit value + public static final int FAIL_CREDIT = -1; + + public static final int NO_SUGGESTED_CREDIT = 0; } diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java index 3bf0c0a..ee603e3 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitControl.java @@ -7133,6 +7133,15 @@ public final class BitControl { * <code>optional bool is_spooling = 4;</code> */ boolean getIsSpooling(); + + /** + * <code>optional bool enable_dynamic_fc = 5;</code> + */ + boolean hasEnableDynamicFc(); + /** + * <code>optional bool enable_dynamic_fc = 5;</code> + */ + boolean getEnableDynamicFc(); } /** * Protobuf type {@code exec.bit.control.Collector} @@ -7151,6 +7160,7 @@ public final class BitControl { incomingMinorFragment_ = java.util.Collections.emptyList(); supportsOutOfOrder_ = false; isSpooling_ = false; + enableDynamicFc_ = false; } @java.lang.Override @@ -7213,6 +7223,11 @@ public final class BitControl { isSpooling_ = input.readBool(); break; } + case 40: { + bitField0_ |= 0x00000008; + enableDynamicFc_ = input.readBool(); + break; + } default: { if (!parseUnknownField( input, unknownFields, extensionRegistry, tag)) { @@ -7317,6 +7332,21 @@ public final class BitControl { return isSpooling_; } + public static final int ENABLE_DYNAMIC_FC_FIELD_NUMBER = 5; + private boolean enableDynamicFc_; + /** + * <code>optional bool enable_dynamic_fc = 5;</code> + */ + public boolean hasEnableDynamicFc() { + return ((bitField0_ & 0x00000008) == 0x00000008); + } + /** + * <code>optional bool enable_dynamic_fc = 5;</code> + */ + public boolean getEnableDynamicFc() { + return enableDynamicFc_; + } + private byte memoizedIsInitialized = -1; @java.lang.Override public final boolean isInitialized() { @@ -7348,6 +7378,9 @@ public final class BitControl { if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBool(4, isSpooling_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + output.writeBool(5, enableDynamicFc_); + } unknownFields.writeTo(output); } @@ -7383,6 +7416,10 @@ public final class BitControl { size += com.google.protobuf.CodedOutputStream .computeBoolSize(4, isSpooling_); } + if (((bitField0_ & 0x00000008) == 0x00000008)) { + size += com.google.protobuf.CodedOutputStream + .computeBoolSize(5, enableDynamicFc_); + } size += unknownFields.getSerializedSize(); memoizedSize = size; return size; @@ -7416,6 +7453,11 @@ public final class BitControl { result = result && (getIsSpooling() == other.getIsSpooling()); } + result = result && (hasEnableDynamicFc() == other.hasEnableDynamicFc()); + if (hasEnableDynamicFc()) { + result = result && (getEnableDynamicFc() + == other.getEnableDynamicFc()); + } result = result && unknownFields.equals(other.unknownFields); return result; } @@ -7445,6 +7487,11 @@ public final class BitControl { hash = (53 * hash) + com.google.protobuf.Internal.hashBoolean( getIsSpooling()); } + if (hasEnableDynamicFc()) { + hash = (37 * hash) + ENABLE_DYNAMIC_FC_FIELD_NUMBER; + hash = (53 * hash) + com.google.protobuf.Internal.hashBoolean( + getEnableDynamicFc()); + } hash = (29 * hash) + unknownFields.hashCode(); memoizedHashCode = hash; return hash; @@ -7586,6 +7633,8 @@ public final class BitControl { bitField0_ = (bitField0_ & ~0x00000004); isSpooling_ = false; bitField0_ = (bitField0_ & ~0x00000008); + enableDynamicFc_ = false; + bitField0_ = (bitField0_ & ~0x00000010); return this; } @@ -7631,6 +7680,10 @@ public final class BitControl { to_bitField0_ |= 0x00000004; } result.isSpooling_ = isSpooling_; + if (((from_bitField0_ & 0x00000010) == 0x00000010)) { + to_bitField0_ |= 0x00000008; + } + result.enableDynamicFc_ = enableDynamicFc_; result.bitField0_ = to_bitField0_; onBuilt(); return result; @@ -7699,6 +7752,9 @@ public final class BitControl { if (other.hasIsSpooling()) { setIsSpooling(other.getIsSpooling()); } + if (other.hasEnableDynamicFc()) { + setEnableDynamicFc(other.getEnableDynamicFc()); + } this.mergeUnknownFields(other.unknownFields); onChanged(); return this; @@ -7890,6 +7946,38 @@ public final class BitControl { onChanged(); return this; } + + private boolean enableDynamicFc_ ; + /** + * <code>optional bool enable_dynamic_fc = 5;</code> + */ + public boolean hasEnableDynamicFc() { + return ((bitField0_ & 0x00000010) == 0x00000010); + } + /** + * <code>optional bool enable_dynamic_fc = 5;</code> + */ + public boolean getEnableDynamicFc() { + return enableDynamicFc_; + } + /** + * <code>optional bool enable_dynamic_fc = 5;</code> + */ + public Builder setEnableDynamicFc(boolean value) { + bitField0_ |= 0x00000010; + enableDynamicFc_ = value; + onChanged(); + return this; + } + /** + * <code>optional bool enable_dynamic_fc = 5;</code> + */ + public Builder clearEnableDynamicFc() { + bitField0_ = (bitField0_ & ~0x00000010); + enableDynamicFc_ = false; + onChanged(); + return this; + } @java.lang.Override public final Builder setUnknownFields( final com.google.protobuf.UnknownFieldSet unknownFields) { @@ -10720,29 +10808,30 @@ public final class BitControl { "ls\022\024\n\014options_json\030\017 \001(\t\022:\n\007context\030\020 \001(" + "\0132).exec.bit.control.QueryContextInforma" + "tion\022.\n\tcollector\030\021 \003(\0132\033.exec.bit.contr" + - "ol.Collector\"\210\001\n\tCollector\022\"\n\032opposite_m" + + "ol.Collector\"\243\001\n\tCollector\022\"\n\032opposite_m" + "ajor_fragment_id\030\001 \001(\005\022#\n\027incoming_minor" + "_fragment\030\002 \003(\005B\002\020\001\022\035\n\025supports_out_of_o" + - "rder\030\003 \001(\010\022\023\n\013is_spooling\030\004 \001(\010\"w\n\027Query" + - "ContextInformation\022\030\n\020query_start_time\030\001" + - " \001(\003\022\021\n\ttime_zone\030\002 \001(\005\022\033\n\023default_schem" + - "a_name\030\003 \001(\t\022\022\n\nsession_id\030\004 \001(\t\"f\n\017Work" + - "QueueStatus\022(\n\010endpoint\030\001 \001(\0132\026.exec.Dri" + - "llbitEndpoint\022\024\n\014queue_length\030\002 \001(\005\022\023\n\013r" + - "eport_time\030\003 \001(\003\"h\n\020FinishedReceiver\022*\n\010" + - "receiver\030\001 \001(\0132\030.exec.bit.FragmentHandle" + - "\022(\n\006sender\030\002 \001(\0132\030.exec.bit.FragmentHand" + - "le*\206\003\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013" + - "\n\007GOODBYE\020\002\022\034\n\030REQ_INITIALIZE_FRAGMENTS\020" + - "\003\022\027\n\023REQ_CANCEL_FRAGMENT\020\006\022\031\n\025REQ_RECEIV" + - "ER_FINISHED\020\007\022\027\n\023REQ_FRAGMENT_STATUS\020\010\022\022" + - "\n\016REQ_BIT_STATUS\020\t\022\024\n\020REQ_QUERY_STATUS\020\n" + - "\022\024\n\020REQ_QUERY_CANCEL\020\017\022\030\n\024REQ_UNPAUSE_FR" + - "AGMENT\020\020\022\016\n\nREQ_CUSTOM\020\021\022\030\n\024RESP_FRAGMEN" + - "T_HANDLE\020\013\022\030\n\024RESP_FRAGMENT_STATUS\020\014\022\023\n\017" + - "RESP_BIT_STATUS\020\r\022\025\n\021RESP_QUERY_STATUS\020\016" + - "\022\017\n\013RESP_CUSTOM\020\022\022\020\n\014SASL_MESSAGE\020\023B+\n\033o" + - "rg.apache.drill.exec.protoB\nBitControlH\001" + "rder\030\003 \001(\010\022\023\n\013is_spooling\030\004 \001(\010\022\031\n\021enabl" + + "e_dynamic_fc\030\005 \001(\010\"w\n\027QueryContextInform" + + "ation\022\030\n\020query_start_time\030\001 \001(\003\022\021\n\ttime_" + + "zone\030\002 \001(\005\022\033\n\023default_schema_name\030\003 \001(\t\022" + + "\022\n\nsession_id\030\004 \001(\t\"f\n\017WorkQueueStatus\022(" + + "\n\010endpoint\030\001 \001(\0132\026.exec.DrillbitEndpoint" + + "\022\024\n\014queue_length\030\002 \001(\005\022\023\n\013report_time\030\003 " + + "\001(\003\"h\n\020FinishedReceiver\022*\n\010receiver\030\001 \001(" + + "\0132\030.exec.bit.FragmentHandle\022(\n\006sender\030\002 " + + "\001(\0132\030.exec.bit.FragmentHandle*\206\003\n\007RpcTyp" + + "e\022\r\n\tHANDSHAKE\020\000\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\034" + + "\n\030REQ_INITIALIZE_FRAGMENTS\020\003\022\027\n\023REQ_CANC" + + "EL_FRAGMENT\020\006\022\031\n\025REQ_RECEIVER_FINISHED\020\007" + + "\022\027\n\023REQ_FRAGMENT_STATUS\020\010\022\022\n\016REQ_BIT_STA" + + "TUS\020\t\022\024\n\020REQ_QUERY_STATUS\020\n\022\024\n\020REQ_QUERY" + + "_CANCEL\020\017\022\030\n\024REQ_UNPAUSE_FRAGMENT\020\020\022\016\n\nR" + + "EQ_CUSTOM\020\021\022\030\n\024RESP_FRAGMENT_HANDLE\020\013\022\030\n" + + "\024RESP_FRAGMENT_STATUS\020\014\022\023\n\017RESP_BIT_STAT" + + "US\020\r\022\025\n\021RESP_QUERY_STATUS\020\016\022\017\n\013RESP_CUST" + + "OM\020\022\022\020\n\014SASL_MESSAGE\020\023B+\n\033org.apache.dri" + + "ll.exec.protoB\nBitControlH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { @@ -10800,7 +10889,7 @@ public final class BitControl { internal_static_exec_bit_control_Collector_fieldAccessorTable = new com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_exec_bit_control_Collector_descriptor, - new java.lang.String[] { "OppositeMajorFragmentId", "IncomingMinorFragment", "SupportsOutOfOrder", "IsSpooling", }); + new java.lang.String[] { "OppositeMajorFragmentId", "IncomingMinorFragment", "SupportsOutOfOrder", "IsSpooling", "EnableDynamicFc", }); internal_static_exec_bit_control_QueryContextInformation_descriptor = getDescriptor().getMessageTypes().get(7); internal_static_exec_bit_control_QueryContextInformation_fieldAccessorTable = new diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java index 6a17435..514e35e 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/BitData.java @@ -72,6 +72,14 @@ public final class BitData { * <code>REQ_RUNTIME_FILTER = 5;</code> */ REQ_RUNTIME_FILTER(5), + /** + * <pre> + * a ack for data tunnel,with a runtime suggested credit as a response. + * </pre> + * + * <code>DATA_ACK_WITH_CREDIT = 6;</code> + */ + DATA_ACK_WITH_CREDIT(6), ; /** @@ -110,6 +118,14 @@ public final class BitData { * <code>REQ_RUNTIME_FILTER = 5;</code> */ public static final int REQ_RUNTIME_FILTER_VALUE = 5; + /** + * <pre> + * a ack for data tunnel,with a runtime suggested credit as a response. + * </pre> + * + * <code>DATA_ACK_WITH_CREDIT = 6;</code> + */ + public static final int DATA_ACK_WITH_CREDIT_VALUE = 6; public final int getNumber() { @@ -132,6 +148,7 @@ public final class BitData { case 3: return REQ_RECORD_BATCH; case 4: return SASL_MESSAGE; case 5: return REQ_RUNTIME_FILTER; + case 6: return DATA_ACK_WITH_CREDIT; default: return null; } } @@ -4372,6 +4389,543 @@ public final class BitData { } + public interface AckWithCreditOrBuilder extends + // @@protoc_insertion_point(interface_extends:exec.bit.data.AckWithCredit) + com.google.protobuf.MessageOrBuilder { + + /** + * <pre> + * the credit allowed the sender to send in batch granularity + * </pre> + * + * <code>optional int32 allowed_credit = 1;</code> + */ + boolean hasAllowedCredit(); + /** + * <pre> + * the credit allowed the sender to send in batch granularity + * </pre> + * + * <code>optional int32 allowed_credit = 1;</code> + */ + int getAllowedCredit(); + } + /** + * Protobuf type {@code exec.bit.data.AckWithCredit} + */ + public static final class AckWithCredit extends + com.google.protobuf.GeneratedMessageV3 implements + // @@protoc_insertion_point(message_implements:exec.bit.data.AckWithCredit) + AckWithCreditOrBuilder { + private static final long serialVersionUID = 0L; + // Use AckWithCredit.newBuilder() to construct. + private AckWithCredit(com.google.protobuf.GeneratedMessageV3.Builder<?> builder) { + super(builder); + } + private AckWithCredit() { + allowedCredit_ = 0; + } + + @java.lang.Override + public final com.google.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private AckWithCredit( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + this(); + if (extensionRegistry == null) { + throw new java.lang.NullPointerException(); + } + int mutable_bitField0_ = 0; + com.google.protobuf.UnknownFieldSet.Builder unknownFields = + com.google.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + case 8: { + bitField0_ |= 0x00000001; + allowedCredit_ = input.readInt32(); + break; + } + default: { + if (!parseUnknownField( + input, unknownFields, extensionRegistry, tag)) { + done = true; + } + break; + } + } + } + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new com.google.protobuf.InvalidProtocolBufferException( + e).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.drill.exec.proto.BitData.internal_static_exec_bit_data_AckWithCredit_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.drill.exec.proto.BitData.internal_static_exec_bit_data_AckWithCredit_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.drill.exec.proto.BitData.AckWithCredit.class, org.apache.drill.exec.proto.BitData.AckWithCredit.Builder.class); + } + + private int bitField0_; + public static final int ALLOWED_CREDIT_FIELD_NUMBER = 1; + private int allowedCredit_; + /** + * <pre> + * the credit allowed the sender to send in batch granularity + * </pre> + * + * <code>optional int32 allowed_credit = 1;</code> + */ + public boolean hasAllowedCredit() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * <pre> + * the credit allowed the sender to send in batch granularity + * </pre> + * + * <code>optional int32 allowed_credit = 1;</code> + */ + public int getAllowedCredit() { + return allowedCredit_; + } + + private byte memoizedIsInitialized = -1; + @java.lang.Override + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized == 1) return true; + if (isInitialized == 0) return false; + + memoizedIsInitialized = 1; + return true; + } + + @java.lang.Override + public void writeTo(com.google.protobuf.CodedOutputStream output) + throws java.io.IOException { + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeInt32(1, allowedCredit_); + } + unknownFields.writeTo(output); + } + + @java.lang.Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += com.google.protobuf.CodedOutputStream + .computeInt32Size(1, allowedCredit_); + } + size += unknownFields.getSerializedSize(); + memoizedSize = size; + return size; + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof org.apache.drill.exec.proto.BitData.AckWithCredit)) { + return super.equals(obj); + } + org.apache.drill.exec.proto.BitData.AckWithCredit other = (org.apache.drill.exec.proto.BitData.AckWithCredit) obj; + + boolean result = true; + result = result && (hasAllowedCredit() == other.hasAllowedCredit()); + if (hasAllowedCredit()) { + result = result && (getAllowedCredit() + == other.getAllowedCredit()); + } + result = result && unknownFields.equals(other.unknownFields); + return result; + } + + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptor().hashCode(); + if (hasAllowedCredit()) { + hash = (37 * hash) + ALLOWED_CREDIT_FIELD_NUMBER; + hash = (53 * hash) + getAllowedCredit(); + } + hash = (29 * hash) + unknownFields.hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static org.apache.drill.exec.proto.BitData.AckWithCredit parseFrom( + java.nio.ByteBuffer data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.drill.exec.proto.BitData.AckWithCredit parseFrom( + java.nio.ByteBuffer data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.drill.exec.proto.BitData.AckWithCredit parseFrom( + com.google.protobuf.ByteString data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.drill.exec.proto.BitData.AckWithCredit parseFrom( + com.google.protobuf.ByteString data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.drill.exec.proto.BitData.AckWithCredit parseFrom(byte[] data) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static org.apache.drill.exec.proto.BitData.AckWithCredit parseFrom( + byte[] data, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static org.apache.drill.exec.proto.BitData.AckWithCredit parseFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static org.apache.drill.exec.proto.BitData.AckWithCredit parseFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + public static org.apache.drill.exec.proto.BitData.AckWithCredit parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input); + } + public static org.apache.drill.exec.proto.BitData.AckWithCredit parseDelimitedFrom( + java.io.InputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input, extensionRegistry); + } + public static org.apache.drill.exec.proto.BitData.AckWithCredit parseFrom( + com.google.protobuf.CodedInputStream input) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static org.apache.drill.exec.proto.BitData.AckWithCredit parseFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return com.google.protobuf.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + + @java.lang.Override + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder() { + return DEFAULT_INSTANCE.toBuilder(); + } + public static Builder newBuilder(org.apache.drill.exec.proto.BitData.AckWithCredit prototype) { + return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); + } + @java.lang.Override + public Builder toBuilder() { + return this == DEFAULT_INSTANCE + ? new Builder() : new Builder().mergeFrom(this); + } + + @java.lang.Override + protected Builder newBuilderForType( + com.google.protobuf.GeneratedMessageV3.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code exec.bit.data.AckWithCredit} + */ + public static final class Builder extends + com.google.protobuf.GeneratedMessageV3.Builder<Builder> implements + // @@protoc_insertion_point(builder_implements:exec.bit.data.AckWithCredit) + org.apache.drill.exec.proto.BitData.AckWithCreditOrBuilder { + public static final com.google.protobuf.Descriptors.Descriptor + getDescriptor() { + return org.apache.drill.exec.proto.BitData.internal_static_exec_bit_data_AckWithCredit_descriptor; + } + + @java.lang.Override + protected com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return org.apache.drill.exec.proto.BitData.internal_static_exec_bit_data_AckWithCredit_fieldAccessorTable + .ensureFieldAccessorsInitialized( + org.apache.drill.exec.proto.BitData.AckWithCredit.class, org.apache.drill.exec.proto.BitData.AckWithCredit.Builder.class); + } + + // Construct using org.apache.drill.exec.proto.BitData.AckWithCredit.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + com.google.protobuf.GeneratedMessageV3.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (com.google.protobuf.GeneratedMessageV3 + .alwaysUseFieldBuilders) { + } + } + @java.lang.Override + public Builder clear() { + super.clear(); + allowedCredit_ = 0; + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + @java.lang.Override + public com.google.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return org.apache.drill.exec.proto.BitData.internal_static_exec_bit_data_AckWithCredit_descriptor; + } + + @java.lang.Override + public org.apache.drill.exec.proto.BitData.AckWithCredit getDefaultInstanceForType() { + return org.apache.drill.exec.proto.BitData.AckWithCredit.getDefaultInstance(); + } + + @java.lang.Override + public org.apache.drill.exec.proto.BitData.AckWithCredit build() { + org.apache.drill.exec.proto.BitData.AckWithCredit result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + @java.lang.Override + public org.apache.drill.exec.proto.BitData.AckWithCredit buildPartial() { + org.apache.drill.exec.proto.BitData.AckWithCredit result = new org.apache.drill.exec.proto.BitData.AckWithCredit(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.allowedCredit_ = allowedCredit_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + @java.lang.Override + public Builder clone() { + return (Builder) super.clone(); + } + @java.lang.Override + public Builder setField( + com.google.protobuf.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return (Builder) super.setField(field, value); + } + @java.lang.Override + public Builder clearField( + com.google.protobuf.Descriptors.FieldDescriptor field) { + return (Builder) super.clearField(field); + } + @java.lang.Override + public Builder clearOneof( + com.google.protobuf.Descriptors.OneofDescriptor oneof) { + return (Builder) super.clearOneof(oneof); + } + @java.lang.Override + public Builder setRepeatedField( + com.google.protobuf.Descriptors.FieldDescriptor field, + int index, java.lang.Object value) { + return (Builder) super.setRepeatedField(field, index, value); + } + @java.lang.Override + public Builder addRepeatedField( + com.google.protobuf.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return (Builder) super.addRepeatedField(field, value); + } + @java.lang.Override + public Builder mergeFrom(com.google.protobuf.Message other) { + if (other instanceof org.apache.drill.exec.proto.BitData.AckWithCredit) { + return mergeFrom((org.apache.drill.exec.proto.BitData.AckWithCredit)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(org.apache.drill.exec.proto.BitData.AckWithCredit other) { + if (other == org.apache.drill.exec.proto.BitData.AckWithCredit.getDefaultInstance()) return this; + if (other.hasAllowedCredit()) { + setAllowedCredit(other.getAllowedCredit()); + } + this.mergeUnknownFields(other.unknownFields); + onChanged(); + return this; + } + + @java.lang.Override + public final boolean isInitialized() { + return true; + } + + @java.lang.Override + public Builder mergeFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + org.apache.drill.exec.proto.BitData.AckWithCredit parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (com.google.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (org.apache.drill.exec.proto.BitData.AckWithCredit) e.getUnfinishedMessage(); + throw e.unwrapIOException(); + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + private int allowedCredit_ ; + /** + * <pre> + * the credit allowed the sender to send in batch granularity + * </pre> + * + * <code>optional int32 allowed_credit = 1;</code> + */ + public boolean hasAllowedCredit() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * <pre> + * the credit allowed the sender to send in batch granularity + * </pre> + * + * <code>optional int32 allowed_credit = 1;</code> + */ + public int getAllowedCredit() { + return allowedCredit_; + } + /** + * <pre> + * the credit allowed the sender to send in batch granularity + * </pre> + * + * <code>optional int32 allowed_credit = 1;</code> + */ + public Builder setAllowedCredit(int value) { + bitField0_ |= 0x00000001; + allowedCredit_ = value; + onChanged(); + return this; + } + /** + * <pre> + * the credit allowed the sender to send in batch granularity + * </pre> + * + * <code>optional int32 allowed_credit = 1;</code> + */ + public Builder clearAllowedCredit() { + bitField0_ = (bitField0_ & ~0x00000001); + allowedCredit_ = 0; + onChanged(); + return this; + } + @java.lang.Override + public final Builder setUnknownFields( + final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.setUnknownFields(unknownFields); + } + + @java.lang.Override + public final Builder mergeUnknownFields( + final com.google.protobuf.UnknownFieldSet unknownFields) { + return super.mergeUnknownFields(unknownFields); + } + + + // @@protoc_insertion_point(builder_scope:exec.bit.data.AckWithCredit) + } + + // @@protoc_insertion_point(class_scope:exec.bit.data.AckWithCredit) + private static final org.apache.drill.exec.proto.BitData.AckWithCredit DEFAULT_INSTANCE; + static { + DEFAULT_INSTANCE = new org.apache.drill.exec.proto.BitData.AckWithCredit(); + } + + public static org.apache.drill.exec.proto.BitData.AckWithCredit getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + @java.lang.Deprecated public static final com.google.protobuf.Parser<AckWithCredit> + PARSER = new com.google.protobuf.AbstractParser<AckWithCredit>() { + @java.lang.Override + public AckWithCredit parsePartialFrom( + com.google.protobuf.CodedInputStream input, + com.google.protobuf.ExtensionRegistryLite extensionRegistry) + throws com.google.protobuf.InvalidProtocolBufferException { + return new AckWithCredit(input, extensionRegistry); + } + }; + + public static com.google.protobuf.Parser<AckWithCredit> parser() { + return PARSER; + } + + @java.lang.Override + public com.google.protobuf.Parser<AckWithCredit> getParserForType() { + return PARSER; + } + + @java.lang.Override + public org.apache.drill.exec.proto.BitData.AckWithCredit getDefaultInstanceForType() { + return DEFAULT_INSTANCE; + } + + } + private static final com.google.protobuf.Descriptors.Descriptor internal_static_exec_bit_data_BitClientHandshake_descriptor; private static final @@ -4392,6 +4946,11 @@ public final class BitData { private static final com.google.protobuf.GeneratedMessageV3.FieldAccessorTable internal_static_exec_bit_data_RuntimeFilterBDef_fieldAccessorTable; + private static final com.google.protobuf.Descriptors.Descriptor + internal_static_exec_bit_data_AckWithCredit_descriptor; + private static final + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable + internal_static_exec_bit_data_AckWithCredit_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { @@ -4420,11 +4979,12 @@ public final class BitData { "ment_id\030\003 \001(\005\022\022\n\nto_foreman\030\004 \001(\010\022\"\n\032blo" + "om_filter_size_in_bytes\030\005 \003(\005\022\024\n\014probe_f" + "ields\030\006 \003(\t\022\020\n\010hj_op_id\030\007 \001(\005\022\025\n\rrf_iden" + - "tifier\030\010 \001(\003*n\n\007RpcType\022\r\n\tHANDSHAKE\020\000\022\007" + - "\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BATCH" + - "\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FILT" + - "ER\020\005B(\n\033org.apache.drill.exec.protoB\007Bit" + - "DataH\001" + "tifier\030\010 \001(\003\"\'\n\rAckWithCredit\022\026\n\016allowed" + + "_credit\030\001 \001(\005*\210\001\n\007RpcType\022\r\n\tHANDSHAKE\020\000" + + "\022\007\n\003ACK\020\001\022\013\n\007GOODBYE\020\002\022\024\n\020REQ_RECORD_BAT" + + "CH\020\003\022\020\n\014SASL_MESSAGE\020\004\022\026\n\022REQ_RUNTIME_FI" + + "LTER\020\005\022\030\n\024DATA_ACK_WITH_CREDIT\020\006B(\n\033org." + + "apache.drill.exec.protoB\007BitDataH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor. InternalDescriptorAssigner() { @@ -4465,6 +5025,12 @@ public final class BitData { com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( internal_static_exec_bit_data_RuntimeFilterBDef_descriptor, new java.lang.String[] { "QueryId", "MajorFragmentId", "MinorFragmentId", "ToForeman", "BloomFilterSizeInBytes", "ProbeFields", "HjOpId", "RfIdentifier", }); + internal_static_exec_bit_data_AckWithCredit_descriptor = + getDescriptor().getMessageTypes().get(4); + internal_static_exec_bit_data_AckWithCredit_fieldAccessorTable = new + com.google.protobuf.GeneratedMessageV3.FieldAccessorTable( + internal_static_exec_bit_data_AckWithCredit_descriptor, + new java.lang.String[] { "AllowedCredit", }); org.apache.drill.exec.proto.ExecProtos.getDescriptor(); org.apache.drill.exec.proto.CoordinationProtos.getDescriptor(); org.apache.drill.exec.proto.UserBitShared.getDescriptor(); diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java index 23b2638..6ec3b1f 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitControl.java @@ -865,6 +865,8 @@ public final class SchemaBitControl output.writeBool(3, message.getSupportsOutOfOrder(), false); if(message.hasIsSpooling()) output.writeBool(4, message.getIsSpooling(), false); + if(message.hasEnableDynamicFc()) + output.writeBool(5, message.getEnableDynamicFc(), false); } public boolean isInitialized(org.apache.drill.exec.proto.BitControl.Collector message) { @@ -916,6 +918,9 @@ public final class SchemaBitControl case 4: builder.setIsSpooling(input.readBool()); break; + case 5: + builder.setEnableDynamicFc(input.readBool()); + break; default: input.handleUnknownField(number, this); } @@ -960,6 +965,7 @@ public final class SchemaBitControl case 2: return "incomingMinorFragment"; case 3: return "supportsOutOfOrder"; case 4: return "isSpooling"; + case 5: return "enableDynamicFc"; default: return null; } } @@ -975,6 +981,7 @@ public final class SchemaBitControl fieldMap.put("incomingMinorFragment", 2); fieldMap.put("supportsOutOfOrder", 3); fieldMap.put("isSpooling", 4); + fieldMap.put("enableDynamicFc", 5); } } diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java index 091c690..d08ba82 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/SchemaBitData.java @@ -579,4 +579,115 @@ public final class SchemaBitData } } + public static final class AckWithCredit + { + public static final org.apache.drill.exec.proto.SchemaBitData.AckWithCredit.MessageSchema WRITE = + new org.apache.drill.exec.proto.SchemaBitData.AckWithCredit.MessageSchema(); + public static final org.apache.drill.exec.proto.SchemaBitData.AckWithCredit.BuilderSchema MERGE = + new org.apache.drill.exec.proto.SchemaBitData.AckWithCredit.BuilderSchema(); + + public static class MessageSchema implements io.protostuff.Schema<org.apache.drill.exec.proto.BitData.AckWithCredit> + { + public void writeTo(io.protostuff.Output output, org.apache.drill.exec.proto.BitData.AckWithCredit message) throws java.io.IOException + { + if(message.hasAllowedCredit()) + output.writeInt32(1, message.getAllowedCredit(), false); + } + public boolean isInitialized(org.apache.drill.exec.proto.BitData.AckWithCredit message) + { + return message.isInitialized(); + } + public java.lang.String getFieldName(int number) + { + return org.apache.drill.exec.proto.SchemaBitData.AckWithCredit.getFieldName(number); + } + public int getFieldNumber(java.lang.String name) + { + return org.apache.drill.exec.proto.SchemaBitData.AckWithCredit.getFieldNumber(name); + } + public java.lang.Class<org.apache.drill.exec.proto.BitData.AckWithCredit> typeClass() + { + return org.apache.drill.exec.proto.BitData.AckWithCredit.class; + } + public java.lang.String messageName() + { + return org.apache.drill.exec.proto.BitData.AckWithCredit.class.getSimpleName(); + } + public java.lang.String messageFullName() + { + return org.apache.drill.exec.proto.BitData.AckWithCredit.class.getName(); + } + //unused + public void mergeFrom(io.protostuff.Input input, org.apache.drill.exec.proto.BitData.AckWithCredit message) throws java.io.IOException {} + public org.apache.drill.exec.proto.BitData.AckWithCredit newMessage() { return null; } + } + public static class BuilderSchema implements io.protostuff.Schema<org.apache.drill.exec.proto.BitData.AckWithCredit.Builder> + { + public void mergeFrom(io.protostuff.Input input, org.apache.drill.exec.proto.BitData.AckWithCredit.Builder builder) throws java.io.IOException + { + for(int number = input.readFieldNumber(this);; number = input.readFieldNumber(this)) + { + switch(number) + { + case 0: + return; + case 1: + builder.setAllowedCredit(input.readInt32()); + break; + default: + input.handleUnknownField(number, this); + } + } + } + public boolean isInitialized(org.apache.drill.exec.proto.BitData.AckWithCredit.Builder builder) + { + return builder.isInitialized(); + } + public org.apache.drill.exec.proto.BitData.AckWithCredit.Builder newMessage() + { + return org.apache.drill.exec.proto.BitData.AckWithCredit.newBuilder(); + } + public java.lang.String getFieldName(int number) + { + return org.apache.drill.exec.proto.SchemaBitData.AckWithCredit.getFieldName(number); + } + public int getFieldNumber(java.lang.String name) + { + return org.apache.drill.exec.proto.SchemaBitData.AckWithCredit.getFieldNumber(name); + } + public java.lang.Class<org.apache.drill.exec.proto.BitData.AckWithCredit.Builder> typeClass() + { + return org.apache.drill.exec.proto.BitData.AckWithCredit.Builder.class; + } + public java.lang.String messageName() + { + return org.apache.drill.exec.proto.BitData.AckWithCredit.class.getSimpleName(); + } + public java.lang.String messageFullName() + { + return org.apache.drill.exec.proto.BitData.AckWithCredit.class.getName(); + } + //unused + public void writeTo(io.protostuff.Output output, org.apache.drill.exec.proto.BitData.AckWithCredit.Builder builder) throws java.io.IOException {} + } + public static java.lang.String getFieldName(int number) + { + switch(number) + { + case 1: return "allowedCredit"; + default: return null; + } + } + public static int getFieldNumber(java.lang.String name) + { + java.lang.Integer number = fieldMap.get(name); + return number == null ? 0 : number.intValue(); + } + private static final java.util.HashMap<java.lang.String,java.lang.Integer> fieldMap = new java.util.HashMap<java.lang.String,java.lang.Integer>(); + static + { + fieldMap.put("allowedCredit", 1); + } + } + } diff --git a/protocol/src/main/protobuf/BitControl.proto b/protocol/src/main/protobuf/BitControl.proto index 44335d6..f95f40f 100644 --- a/protocol/src/main/protobuf/BitControl.proto +++ b/protocol/src/main/protobuf/BitControl.proto @@ -104,6 +104,7 @@ message Collector { repeated int32 incoming_minor_fragment = 2 [packed=true]; optional bool supports_out_of_order = 3; optional bool is_spooling = 4; + optional bool enable_dynamic_fc = 5; } message QueryContextInformation { diff --git a/protocol/src/main/protobuf/BitData.proto b/protocol/src/main/protobuf/BitData.proto index 177ff9f..32f2e84 100644 --- a/protocol/src/main/protobuf/BitData.proto +++ b/protocol/src/main/protobuf/BitData.proto @@ -36,6 +36,8 @@ enum RpcType { SASL_MESSAGE = 4; REQ_RUNTIME_FILTER = 5; // send runtime filter data from HashJoin to Foreman, from Foreman to Scan nodes. + + DATA_ACK_WITH_CREDIT = 6; // a ack for data tunnel,with a runtime suggested credit as a response. } message BitClientHandshake{ @@ -68,3 +70,7 @@ message RuntimeFilterBDef{ optional int32 hj_op_id = 7; // the operator id of the HashJoin which generates this RuntimeFilter optional int64 rf_identifier = 8; // the runtime filter identifier } + +message AckWithCredit{ + optional int32 allowed_credit = 1; // the credit allowed the sender to send in batch granularity +}
