This is an automated email from the ASF dual-hosted git repository. karthikz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new 1c47ef8 While emitting in spout, adhere to the batch size limit (#2798) 1c47ef8 is described below commit 1c47ef86ba57eb612cf965285211dae1a3ce3196 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Tue Apr 3 11:54:44 2018 -0700 While emitting in spout, adhere to the batch size limit (#2798) * While emitting in spout, adhere to the batch size limit * Added accessor * Added missing imports * Make sure that C++ instance also adheres to batch emit size constraint --- heron/common/src/cpp/config/heron-internals-config-reader.cpp | 5 +++++ heron/common/src/cpp/config/heron-internals-config-reader.h | 3 +++ heron/common/src/cpp/config/heron-internals-config-vars.cpp | 2 ++ heron/common/src/cpp/config/heron-internals-config-vars.h | 3 +++ heron/instance/src/cpp/slave/outgoing-tuple-collection.h | 2 +- heron/instance/src/cpp/spoutimpl/spout-instance.cpp | 8 +++++++- heron/instance/src/cpp/spoutimpl/spout-instance.h | 2 ++ heron/instance/src/cpp/spoutimpl/spout-output-collector-impl.h | 1 + .../java/com/twitter/heron/instance/AbstractOutputCollector.java | 7 +++++++ .../src/java/com/twitter/heron/instance/spout/SpoutInstance.java | 8 ++++++++ 10 files changed, 39 insertions(+), 2 deletions(-) diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.cpp b/heron/common/src/cpp/config/heron-internals-config-reader.cpp index 05c4b49..174dcb8 100644 --- a/heron/common/src/cpp/config/heron-internals-config-reader.cpp +++ b/heron/common/src/cpp/config/heron-internals-config-reader.cpp @@ -317,6 +317,11 @@ int HeronInternalsConfigReader::GetHeronInstanceEmitBatchTimeMs() { .as<int>(); } +int HeronInternalsConfigReader::GetHeronInstanceEmitBatchSize() { + return config_[HeronInternalsConfigVars::HERON_INSTANCE_EMIT_BATCH_SIZE] + .as<int>(); +} + int HeronInternalsConfigReader::GetHeronInstanceSetDataTupleCapacity() { return config_[HeronInternalsConfigVars::HERON_INSTANCE_SET_DATA_TUPLE_CAPACITY] .as<int>(); diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.h b/heron/common/src/cpp/config/heron-internals-config-reader.h index 696c58f..d5a34a1 100644 --- a/heron/common/src/cpp/config/heron-internals-config-reader.h +++ b/heron/common/src/cpp/config/heron-internals-config-reader.h @@ -216,6 +216,9 @@ class HeronInternalsConfigReader : public YamlFileReader { // The maximum time in ms for an spout instance to emit tuples per attempt int GetHeronInstanceEmitBatchTimeMs(); + // The maximum number of bytes for an spout instance to emit tuples per attempt + int GetHeronInstanceEmitBatchSize(); + // The maximum # of data tuple to batch in a HeronDataTupleSet protobuf int GetHeronInstanceSetDataTupleCapacity(); diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.cpp b/heron/common/src/cpp/config/heron-internals-config-vars.cpp index 373ba4a..0906eaa 100644 --- a/heron/common/src/cpp/config/heron-internals-config-vars.cpp +++ b/heron/common/src/cpp/config/heron-internals-config-vars.cpp @@ -128,6 +128,8 @@ const sp_string HeronInternalsConfigVars::HERON_INSTANCE_INTERNAL_SPOUT_WRITE_QU "heron.instance.internal.spout.write.queue.capacity"; const sp_string HeronInternalsConfigVars::HERON_INSTANCE_EMIT_BATCH_TIME_MS = "heron.instance.emit.batch.time.ms"; +const sp_string HeronInternalsConfigVars::HERON_INSTANCE_EMIT_BATCH_SIZE = + "heron.instance.emit.batch.size.bytes"; const sp_string HeronInternalsConfigVars::HERON_INSTANCE_SET_DATA_TUPLE_CAPACITY = "heron.instance.set.data.tuple.capacity"; const sp_string HeronInternalsConfigVars::HERON_INSTANCE_SET_DATA_TUPLE_SIZE_BYTES = diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.h b/heron/common/src/cpp/config/heron-internals-config-vars.h index bcab89a..5ac3752 100644 --- a/heron/common/src/cpp/config/heron-internals-config-vars.h +++ b/heron/common/src/cpp/config/heron-internals-config-vars.h @@ -206,6 +206,9 @@ class HeronInternalsConfigVars { // The maximum time in ms for an spout instance to emit tuples per attempt static const sp_string HERON_INSTANCE_EMIT_BATCH_TIME_MS; + // The maximum number of bytes for n spout instance to emit tuples per attempt + static const sp_string HERON_INSTANCE_EMIT_BATCH_SIZE; + // The maximum # of data tuple to batch in a HeronDataTupleSet protobuf static const sp_string HERON_INSTANCE_SET_DATA_TUPLE_CAPACITY; diff --git a/heron/instance/src/cpp/slave/outgoing-tuple-collection.h b/heron/instance/src/cpp/slave/outgoing-tuple-collection.h index b9d722b..0acc76b 100644 --- a/heron/instance/src/cpp/slave/outgoing-tuple-collection.h +++ b/heron/instance/src/cpp/slave/outgoing-tuple-collection.h @@ -43,12 +43,12 @@ class OutgoingTupleCollection { int tupleSize); int64_t getTotalDataTuplesEmitted() const; + int64_t getTotalDataSizeEmitted() const; private: void initNewDataTuple(const std::string& streamId); void initNewControlTuple(); void flushRemaining(); - int64_t getTotalDataSizeEmitted() const; std::string componentName_; NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave_; diff --git a/heron/instance/src/cpp/spoutimpl/spout-instance.cpp b/heron/instance/src/cpp/spoutimpl/spout-instance.cpp index 422776a..ed36430 100644 --- a/heron/instance/src/cpp/spoutimpl/spout-instance.cpp +++ b/heron/instance/src/cpp/spoutimpl/spout-instance.cpp @@ -40,6 +40,8 @@ SpoutInstance::SpoutInstance(EventLoop* eventLoop, ->GetHeronInstanceInternalSpoutWriteQueueCapacity(); maxEmitBatchIntervalMs_ = config::HeronInternalsConfigReader::Instance() ->GetHeronInstanceEmitBatchTimeMs(); + maxEmitBatchSize_ = config::HeronInternalsConfigReader::Instance() + ->GetHeronInstanceEmitBatchSize(); ackingEnabled_ = taskContext->isAckingEnabled(); enableMessageTimeouts_ = taskContext->enableMessageTimeouts(); lookForTimeoutsTimer_ = -1; @@ -61,7 +63,8 @@ SpoutInstance::SpoutInstance(EventLoop* eventLoop, collector_.reset(new SpoutOutputCollectorImpl(serializer_, taskContext_, dataFromSlave_)); LOG(INFO) << "Instantiated spout for component " << taskContext->getThisComponentName() << " with task_id " << taskContext->getThisTaskId() << " and maxWriteBufferSize_ " - << maxWriteBufferSize_ << " and maxEmitBatchIntervalMs " << maxEmitBatchIntervalMs_; + << maxWriteBufferSize_ << " and maxEmitBatchIntervalMs " << maxEmitBatchIntervalMs_ + << " and maxEmitBatchSize " << maxEmitBatchSize_; } SpoutInstance::~SpoutInstance() { @@ -131,6 +134,7 @@ void SpoutInstance::produceTuple() { int maxSpoutPending = atoi(taskContext_->getConfig() ->get(api::config::Config::TOPOLOGY_MAX_SPOUT_PENDING).c_str()); int64_t totalTuplesEmitted = collector_->getTotalDataTuplesEmitted(); + int64_t totalBytesEmitted = collector_->getTotalDataBytesEmitted(); int64_t startTime = std::chrono::duration_cast<std::chrono::nanoseconds>( std::chrono::system_clock::now().time_since_epoch()).count(); int64_t startOfCall = startTime; @@ -141,9 +145,11 @@ void SpoutInstance::produceTuple() { std::chrono::system_clock::now().time_since_epoch()).count(); metrics_->nextTuple(currentTime - startOfCall); int64_t newTotalTuplesEmitted = collector_->getTotalDataTuplesEmitted(); + int64_t newTotalBytesEmitted = collector_->getTotalDataBytesEmitted(); if (newTotalTuplesEmitted == totalTuplesEmitted) break; totalTuplesEmitted = newTotalTuplesEmitted; if (currentTime - startTime > maxEmitBatchIntervalMs_ * 1000 * 1000) break; + if (newTotalBytesEmitted - totalBytesEmitted > maxEmitBatchSize_) break; startOfCall = currentTime; } } diff --git a/heron/instance/src/cpp/spoutimpl/spout-instance.h b/heron/instance/src/cpp/spoutimpl/spout-instance.h index 462e14a..e83498e 100644 --- a/heron/instance/src/cpp/spoutimpl/spout-instance.h +++ b/heron/instance/src/cpp/spoutimpl/spout-instance.h @@ -74,6 +74,8 @@ class SpoutInstance : public InstanceBase { int maxWriteBufferSize_; // This is the max time to spend in emitting tuple in one go int maxEmitBatchIntervalMs_; + // This is the max number of bytes to emit in one go + int maxEmitBatchSize_; }; } // namespace instance diff --git a/heron/instance/src/cpp/spoutimpl/spout-output-collector-impl.h b/heron/instance/src/cpp/spoutimpl/spout-output-collector-impl.h index 7d19433..da953e6 100644 --- a/heron/instance/src/cpp/spoutimpl/spout-output-collector-impl.h +++ b/heron/instance/src/cpp/spoutimpl/spout-output-collector-impl.h @@ -47,6 +47,7 @@ class SpoutOutputCollectorImpl : public api::spout::ISpoutOutputCollector { virtual void reportError(std::exception& except); int64_t getTotalDataTuplesEmitted() const { return collector_->getTotalDataTuplesEmitted(); } + int64_t getTotalDataBytesEmitted() const { return collector_->getTotalDataSizeEmitted(); } int64_t numInFlight() const { return inflightTuples_.size(); } int getImmediateAcksSize() const { return immediateAcks_.size(); } std::shared_ptr<RootTupleInfo> getImmediateAcksFront() { diff --git a/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java b/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java index b7ec1e7..0db45df 100644 --- a/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java +++ b/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java @@ -38,6 +38,7 @@ public class AbstractOutputCollector { protected final ComponentMetrics metrics; protected final boolean ackEnabled; private long totalTuplesEmitted; + private long totalBytesEmitted; private PhysicalPlanHelper helper; /** @@ -52,6 +53,7 @@ public class AbstractOutputCollector { this.serializer = serializer; this.metrics = metrics; this.totalTuplesEmitted = 0; + this.totalBytesEmitted = 0; this.helper = helper; Map<String, Object> config = helper.getTopologyContext().getTopologyConfig(); @@ -117,6 +119,10 @@ public class AbstractOutputCollector { return totalTuplesEmitted; } + public long getTotalBytesEmitted() { + return totalBytesEmitted; + } + protected HeronTuples.HeronDataTuple.Builder initTupleBuilder(String streamId, List<Object> tuple, Integer emitDirectTaskId) { @@ -168,6 +174,7 @@ public class AbstractOutputCollector { // submit to outputter outputter.addDataTuple(streamId, bldr, tupleSizeInBytes); totalTuplesEmitted++; + totalBytesEmitted += tupleSizeInBytes; // Update metrics metrics.emittedTuple(streamId); diff --git a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java index 100d293..28e1161 100644 --- a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java +++ b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java @@ -33,6 +33,7 @@ import com.twitter.heron.api.state.State; import com.twitter.heron.api.topology.IStatefulComponent; import com.twitter.heron.api.topology.IUpdatable; import com.twitter.heron.api.utils.Utils; +import com.twitter.heron.common.basics.ByteAmount; import com.twitter.heron.common.basics.Communicator; import com.twitter.heron.common.basics.SingletonRegistry; import com.twitter.heron.common.basics.SlaveLooper; @@ -306,8 +307,10 @@ public class SpoutInstance implements IInstance { int maxSpoutPending = TypeUtils.getInteger(config.get(Config.TOPOLOGY_MAX_SPOUT_PENDING)); long totalTuplesEmitted = collector.getTotalTuplesEmitted(); + long totalBytesEmitted = collector.getTotalBytesEmitted(); Duration instanceEmitBatchTime = systemConfig.getInstanceEmitBatchTime(); + ByteAmount instanceEmitBatchSize = systemConfig.getInstanceEmitBatchSize(); long startOfCycle = System.nanoTime(); @@ -326,6 +329,7 @@ public class SpoutInstance implements IInstance { spoutMetrics.nextTuple(latency); long newTotalTuplesEmitted = collector.getTotalTuplesEmitted(); + long newTotalBytesEmitted = collector.getTotalBytesEmitted(); if (newTotalTuplesEmitted == totalTuplesEmitted) { // No tuples to emit.... break; @@ -337,6 +341,10 @@ public class SpoutInstance implements IInstance { if (currentTime - startOfCycle - instanceEmitBatchTime.toNanos() > 0) { break; } + if (!ByteAmount.fromBytes(newTotalBytesEmitted - totalBytesEmitted) + .lessThan(instanceEmitBatchSize)) { + break; + } } } -- To stop receiving notification emails like this one, please contact karth...@apache.org.