This is an automated email from the ASF dual-hosted git repository.

karthikz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c47ef8  While emitting in spout, adhere to the batch size limit 
(#2798)
1c47ef8 is described below

commit 1c47ef86ba57eb612cf965285211dae1a3ce3196
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Tue Apr 3 11:54:44 2018 -0700

    While emitting in spout, adhere to the batch size limit (#2798)
    
    * While emitting in spout, adhere to the batch size limit
    
    * Added accessor
    
    * Added missing imports
    
    * Make sure that C++ instance also adheres to batch emit size constraint
---
 heron/common/src/cpp/config/heron-internals-config-reader.cpp     | 5 +++++
 heron/common/src/cpp/config/heron-internals-config-reader.h       | 3 +++
 heron/common/src/cpp/config/heron-internals-config-vars.cpp       | 2 ++
 heron/common/src/cpp/config/heron-internals-config-vars.h         | 3 +++
 heron/instance/src/cpp/slave/outgoing-tuple-collection.h          | 2 +-
 heron/instance/src/cpp/spoutimpl/spout-instance.cpp               | 8 +++++++-
 heron/instance/src/cpp/spoutimpl/spout-instance.h                 | 2 ++
 heron/instance/src/cpp/spoutimpl/spout-output-collector-impl.h    | 1 +
 .../java/com/twitter/heron/instance/AbstractOutputCollector.java  | 7 +++++++
 .../src/java/com/twitter/heron/instance/spout/SpoutInstance.java  | 8 ++++++++
 10 files changed, 39 insertions(+), 2 deletions(-)

diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.cpp 
b/heron/common/src/cpp/config/heron-internals-config-reader.cpp
index 05c4b49..174dcb8 100644
--- a/heron/common/src/cpp/config/heron-internals-config-reader.cpp
+++ b/heron/common/src/cpp/config/heron-internals-config-reader.cpp
@@ -317,6 +317,11 @@ int 
HeronInternalsConfigReader::GetHeronInstanceEmitBatchTimeMs() {
       .as<int>();
 }
 
+int HeronInternalsConfigReader::GetHeronInstanceEmitBatchSize() {
+  return config_[HeronInternalsConfigVars::HERON_INSTANCE_EMIT_BATCH_SIZE]
+      .as<int>();
+}
+
 int HeronInternalsConfigReader::GetHeronInstanceSetDataTupleCapacity() {
   return 
config_[HeronInternalsConfigVars::HERON_INSTANCE_SET_DATA_TUPLE_CAPACITY]
       .as<int>();
diff --git a/heron/common/src/cpp/config/heron-internals-config-reader.h 
b/heron/common/src/cpp/config/heron-internals-config-reader.h
index 696c58f..d5a34a1 100644
--- a/heron/common/src/cpp/config/heron-internals-config-reader.h
+++ b/heron/common/src/cpp/config/heron-internals-config-reader.h
@@ -216,6 +216,9 @@ class HeronInternalsConfigReader : public YamlFileReader {
   // The maximum time in ms for an spout instance to emit tuples per attempt
   int GetHeronInstanceEmitBatchTimeMs();
 
+  // The maximum number of bytes for an spout instance to emit tuples per 
attempt
+  int GetHeronInstanceEmitBatchSize();
+
   // The maximum # of data tuple to batch in a HeronDataTupleSet protobuf
   int GetHeronInstanceSetDataTupleCapacity();
 
diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.cpp 
b/heron/common/src/cpp/config/heron-internals-config-vars.cpp
index 373ba4a..0906eaa 100644
--- a/heron/common/src/cpp/config/heron-internals-config-vars.cpp
+++ b/heron/common/src/cpp/config/heron-internals-config-vars.cpp
@@ -128,6 +128,8 @@ const sp_string 
HeronInternalsConfigVars::HERON_INSTANCE_INTERNAL_SPOUT_WRITE_QU
     "heron.instance.internal.spout.write.queue.capacity";
 const sp_string HeronInternalsConfigVars::HERON_INSTANCE_EMIT_BATCH_TIME_MS =
     "heron.instance.emit.batch.time.ms";
+const sp_string HeronInternalsConfigVars::HERON_INSTANCE_EMIT_BATCH_SIZE =
+    "heron.instance.emit.batch.size.bytes";
 const sp_string 
HeronInternalsConfigVars::HERON_INSTANCE_SET_DATA_TUPLE_CAPACITY =
     "heron.instance.set.data.tuple.capacity";
 const sp_string 
HeronInternalsConfigVars::HERON_INSTANCE_SET_DATA_TUPLE_SIZE_BYTES =
diff --git a/heron/common/src/cpp/config/heron-internals-config-vars.h 
b/heron/common/src/cpp/config/heron-internals-config-vars.h
index bcab89a..5ac3752 100644
--- a/heron/common/src/cpp/config/heron-internals-config-vars.h
+++ b/heron/common/src/cpp/config/heron-internals-config-vars.h
@@ -206,6 +206,9 @@ class HeronInternalsConfigVars {
   // The maximum time in ms for an spout instance to emit tuples per attempt
   static const sp_string HERON_INSTANCE_EMIT_BATCH_TIME_MS;
 
+  // The maximum number of bytes for n spout instance to emit tuples per 
attempt
+  static const sp_string HERON_INSTANCE_EMIT_BATCH_SIZE;
+
   // The maximum # of data tuple to batch in a HeronDataTupleSet protobuf
   static const sp_string HERON_INSTANCE_SET_DATA_TUPLE_CAPACITY;
 
diff --git a/heron/instance/src/cpp/slave/outgoing-tuple-collection.h 
b/heron/instance/src/cpp/slave/outgoing-tuple-collection.h
index b9d722b..0acc76b 100644
--- a/heron/instance/src/cpp/slave/outgoing-tuple-collection.h
+++ b/heron/instance/src/cpp/slave/outgoing-tuple-collection.h
@@ -43,12 +43,12 @@ class OutgoingTupleCollection {
                     int tupleSize);
 
   int64_t getTotalDataTuplesEmitted() const;
+  int64_t getTotalDataSizeEmitted() const;
 
  private:
   void initNewDataTuple(const std::string& streamId);
   void initNewControlTuple();
   void flushRemaining();
-  int64_t getTotalDataSizeEmitted() const;
 
   std::string componentName_;
   NotifyingCommunicator<google::protobuf::Message*>* dataFromSlave_;
diff --git a/heron/instance/src/cpp/spoutimpl/spout-instance.cpp 
b/heron/instance/src/cpp/spoutimpl/spout-instance.cpp
index 422776a..ed36430 100644
--- a/heron/instance/src/cpp/spoutimpl/spout-instance.cpp
+++ b/heron/instance/src/cpp/spoutimpl/spout-instance.cpp
@@ -40,6 +40,8 @@ SpoutInstance::SpoutInstance(EventLoop* eventLoop,
                                
->GetHeronInstanceInternalSpoutWriteQueueCapacity();
   maxEmitBatchIntervalMs_ = config::HeronInternalsConfigReader::Instance()
                              ->GetHeronInstanceEmitBatchTimeMs();
+  maxEmitBatchSize_ = config::HeronInternalsConfigReader::Instance()
+                             ->GetHeronInstanceEmitBatchSize();
   ackingEnabled_ = taskContext->isAckingEnabled();
   enableMessageTimeouts_ = taskContext->enableMessageTimeouts();
   lookForTimeoutsTimer_ = -1;
@@ -61,7 +63,8 @@ SpoutInstance::SpoutInstance(EventLoop* eventLoop,
   collector_.reset(new SpoutOutputCollectorImpl(serializer_, taskContext_, 
dataFromSlave_));
   LOG(INFO) << "Instantiated spout for component " << 
taskContext->getThisComponentName()
             << " with task_id " << taskContext->getThisTaskId() << " and 
maxWriteBufferSize_ "
-            << maxWriteBufferSize_ << " and maxEmitBatchIntervalMs " << 
maxEmitBatchIntervalMs_;
+            << maxWriteBufferSize_ << " and maxEmitBatchIntervalMs " << 
maxEmitBatchIntervalMs_
+            << " and maxEmitBatchSize " << maxEmitBatchSize_;
 }
 
 SpoutInstance::~SpoutInstance() {
@@ -131,6 +134,7 @@ void SpoutInstance::produceTuple() {
   int maxSpoutPending = atoi(taskContext_->getConfig()
                              
->get(api::config::Config::TOPOLOGY_MAX_SPOUT_PENDING).c_str());
   int64_t totalTuplesEmitted = collector_->getTotalDataTuplesEmitted();
+  int64_t totalBytesEmitted = collector_->getTotalDataBytesEmitted();
   int64_t startTime = std::chrono::duration_cast<std::chrono::nanoseconds>(
                                    
std::chrono::system_clock::now().time_since_epoch()).count();
   int64_t startOfCall = startTime;
@@ -141,9 +145,11 @@ void SpoutInstance::produceTuple() {
                                    
std::chrono::system_clock::now().time_since_epoch()).count();
     metrics_->nextTuple(currentTime - startOfCall);
     int64_t newTotalTuplesEmitted = collector_->getTotalDataTuplesEmitted();
+    int64_t newTotalBytesEmitted = collector_->getTotalDataBytesEmitted();
     if (newTotalTuplesEmitted == totalTuplesEmitted) break;
     totalTuplesEmitted = newTotalTuplesEmitted;
     if (currentTime - startTime > maxEmitBatchIntervalMs_ * 1000 * 1000) break;
+    if (newTotalBytesEmitted - totalBytesEmitted > maxEmitBatchSize_) break;
     startOfCall = currentTime;
   }
 }
diff --git a/heron/instance/src/cpp/spoutimpl/spout-instance.h 
b/heron/instance/src/cpp/spoutimpl/spout-instance.h
index 462e14a..e83498e 100644
--- a/heron/instance/src/cpp/spoutimpl/spout-instance.h
+++ b/heron/instance/src/cpp/spoutimpl/spout-instance.h
@@ -74,6 +74,8 @@ class SpoutInstance : public InstanceBase {
   int maxWriteBufferSize_;
   // This is the max time to spend in emitting tuple in one go
   int maxEmitBatchIntervalMs_;
+  // This is the max number of bytes to emit in one go
+  int maxEmitBatchSize_;
 };
 
 }  // namespace instance
diff --git a/heron/instance/src/cpp/spoutimpl/spout-output-collector-impl.h 
b/heron/instance/src/cpp/spoutimpl/spout-output-collector-impl.h
index 7d19433..da953e6 100644
--- a/heron/instance/src/cpp/spoutimpl/spout-output-collector-impl.h
+++ b/heron/instance/src/cpp/spoutimpl/spout-output-collector-impl.h
@@ -47,6 +47,7 @@ class SpoutOutputCollectorImpl : public 
api::spout::ISpoutOutputCollector {
   virtual void reportError(std::exception& except);
 
   int64_t getTotalDataTuplesEmitted() const { return 
collector_->getTotalDataTuplesEmitted(); }
+  int64_t getTotalDataBytesEmitted() const { return 
collector_->getTotalDataSizeEmitted(); }
   int64_t numInFlight() const { return inflightTuples_.size(); }
   int getImmediateAcksSize() const { return immediateAcks_.size(); }
   std::shared_ptr<RootTupleInfo> getImmediateAcksFront() {
diff --git 
a/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java
 
b/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java
index b7ec1e7..0db45df 100644
--- 
a/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java
+++ 
b/heron/instance/src/java/com/twitter/heron/instance/AbstractOutputCollector.java
@@ -38,6 +38,7 @@ public class AbstractOutputCollector {
   protected final ComponentMetrics metrics;
   protected final boolean ackEnabled;
   private long totalTuplesEmitted;
+  private long totalBytesEmitted;
   private PhysicalPlanHelper helper;
 
   /**
@@ -52,6 +53,7 @@ public class AbstractOutputCollector {
     this.serializer = serializer;
     this.metrics = metrics;
     this.totalTuplesEmitted = 0;
+    this.totalBytesEmitted = 0;
     this.helper = helper;
 
     Map<String, Object> config = 
helper.getTopologyContext().getTopologyConfig();
@@ -117,6 +119,10 @@ public class AbstractOutputCollector {
     return totalTuplesEmitted;
   }
 
+  public long getTotalBytesEmitted() {
+    return totalBytesEmitted;
+  }
+
   protected HeronTuples.HeronDataTuple.Builder initTupleBuilder(String 
streamId,
                                                                 List<Object> 
tuple,
                                                                 Integer 
emitDirectTaskId) {
@@ -168,6 +174,7 @@ public class AbstractOutputCollector {
     // submit to outputter
     outputter.addDataTuple(streamId, bldr, tupleSizeInBytes);
     totalTuplesEmitted++;
+    totalBytesEmitted += tupleSizeInBytes;
 
     // Update metrics
     metrics.emittedTuple(streamId);
diff --git 
a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java 
b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
index 100d293..28e1161 100644
--- 
a/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
+++ 
b/heron/instance/src/java/com/twitter/heron/instance/spout/SpoutInstance.java
@@ -33,6 +33,7 @@ import com.twitter.heron.api.state.State;
 import com.twitter.heron.api.topology.IStatefulComponent;
 import com.twitter.heron.api.topology.IUpdatable;
 import com.twitter.heron.api.utils.Utils;
+import com.twitter.heron.common.basics.ByteAmount;
 import com.twitter.heron.common.basics.Communicator;
 import com.twitter.heron.common.basics.SingletonRegistry;
 import com.twitter.heron.common.basics.SlaveLooper;
@@ -306,8 +307,10 @@ public class SpoutInstance implements IInstance {
     int maxSpoutPending = 
TypeUtils.getInteger(config.get(Config.TOPOLOGY_MAX_SPOUT_PENDING));
 
     long totalTuplesEmitted = collector.getTotalTuplesEmitted();
+    long totalBytesEmitted = collector.getTotalBytesEmitted();
 
     Duration instanceEmitBatchTime = systemConfig.getInstanceEmitBatchTime();
+    ByteAmount instanceEmitBatchSize = systemConfig.getInstanceEmitBatchSize();
 
     long startOfCycle = System.nanoTime();
 
@@ -326,6 +329,7 @@ public class SpoutInstance implements IInstance {
       spoutMetrics.nextTuple(latency);
 
       long newTotalTuplesEmitted = collector.getTotalTuplesEmitted();
+      long newTotalBytesEmitted = collector.getTotalBytesEmitted();
       if (newTotalTuplesEmitted == totalTuplesEmitted) {
         // No tuples to emit....
         break;
@@ -337,6 +341,10 @@ public class SpoutInstance implements IInstance {
       if (currentTime - startOfCycle - instanceEmitBatchTime.toNanos() > 0) {
         break;
       }
+      if (!ByteAmount.fromBytes(newTotalBytesEmitted - totalBytesEmitted)
+           .lessThan(instanceEmitBatchSize)) {
+        break;
+      }
     }
   }
 

-- 
To stop receiving notification emails like this one, please contact
karth...@apache.org.

Reply via email to