This is an automated email from the ASF dual-hosted git repository.

nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 498140c  Making emit, ack, and fail thread safe (#2692)
498140c is described below

commit 498140c2674fcdf73f6ce13209266f5e35d50af6
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Tue May 22 15:21:56 2018 -0700

    Making emit, ack, and fail thread safe (#2692)
    
    * Making emit, ack, and fail thread safe
    
    * fixing checkstyle
    
    * refactoring and adding synchronization to state save
---
 .../heron/instance/AbstractOutputCollector.java    |   4 +-
 .../heron/instance/OutgoingTupleCollection.java    | 165 +++++++++++++--------
 .../apache/heron/instance/bolt/BoltInstance.java   |  17 ++-
 .../apache/heron/instance/spout/SpoutInstance.java |  15 +-
 4 files changed, 130 insertions(+), 71 deletions(-)

diff --git 
a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
 
b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
index aac6443..8d5f1fa 100644
--- 
a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
+++ 
b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
@@ -46,6 +47,7 @@ public class AbstractOutputCollector {
   private long totalTuplesEmitted;
   private long totalBytesEmitted;
   private PhysicalPlanHelper helper;
+  public final ReentrantLock lock = new ReentrantLock();
 
   /**
    * The SuppressWarnings is only until TOPOLOGY_ENABLE_ACKING exists.
@@ -79,7 +81,7 @@ public class AbstractOutputCollector {
       }
     }
 
-    this.outputter = new OutgoingTupleCollection(helper, streamOutQueue);
+    this.outputter = new OutgoingTupleCollection(helper, streamOutQueue, lock);
   }
 
   public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
diff --git 
a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
 
b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
index 8775a0c..7c28290 100644
--- 
a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
+++ 
b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
@@ -20,6 +20,8 @@
 package org.apache.heron.instance;
 
 import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.protobuf.ByteString;
 import com.google.protobuf.Message;
@@ -60,14 +62,17 @@ public class OutgoingTupleCollection {
   private HeronTuples.HeronControlTupleSet.Builder currentControlTuple;
 
   // Total data emitted in bytes for the entire life
-  private long totalDataEmittedInBytes;
+  private AtomicLong totalDataEmittedInBytes = new AtomicLong();
 
   // Current size in bytes for data types to pack into the HeronTupleSet
   private long currentDataTupleSizeInBytes;
 
+  private final ReentrantLock lock;
+
   public OutgoingTupleCollection(
       PhysicalPlanHelper helper,
-      Communicator<Message> outQueue) {
+      Communicator<Message> outQueue,
+      ReentrantLock lock) {
     this.outQueue = outQueue;
     this.helper = helper;
     SystemConfig systemConfig =
@@ -77,17 +82,23 @@ public class OutgoingTupleCollection {
         
SerializeDeSerializeHelper.getSerializer(helper.getTopologyContext().getTopologyConfig());
 
     // Initialize the values in constructor
-    this.totalDataEmittedInBytes = 0;
+    this.totalDataEmittedInBytes.set(0);
     this.currentDataTupleSizeInBytes = 0;
 
     // Read the config values
     this.dataTupleSetCapacity = systemConfig.getInstanceSetDataTupleCapacity();
     this.maxDataTupleSize = systemConfig.getInstanceSetDataTupleSize();
     this.controlTupleSetCapacity = 
systemConfig.getInstanceSetControlTupleCapacity();
+    this.lock = lock;
   }
 
   public void sendOutTuples() {
-    flushRemaining();
+    lock.lock();
+    try {
+      flushRemaining();
+    } finally {
+      lock.unlock();
+    }
   }
 
   /**
@@ -97,71 +108,93 @@ public class OutgoingTupleCollection {
    */
   public void sendOutState(State<Serializable, Serializable> state,
                            String checkpointId) {
-    // flush all the current data before sending the state
-    flushRemaining();
-
-    // Serialize the state
-    byte[] serializedState = serializer.serialize(state);
-
-    // Construct the instance state checkpoint
-    CheckpointManager.InstanceStateCheckpoint instanceState =
-        CheckpointManager.InstanceStateCheckpoint.newBuilder()
-          .setCheckpointId(checkpointId)
-          .setState(ByteString.copyFrom(serializedState))
-          .build();
-
-    CheckpointManager.StoreInstanceStateCheckpoint storeRequest =
-        CheckpointManager.StoreInstanceStateCheckpoint.newBuilder()
-            .setState(instanceState)
-            .build();
-
-    // Put the checkpoint to out stream queue
-    outQueue.offer(storeRequest);
+    lock.lock();
+    try {
+      // flush all the current data before sending the state
+      flushRemaining();
+
+      // Serialize the state
+      byte[] serializedState = serializer.serialize(state);
+
+      // Construct the instance state checkpoint
+      CheckpointManager.InstanceStateCheckpoint instanceState =
+          CheckpointManager.InstanceStateCheckpoint.newBuilder()
+              .setCheckpointId(checkpointId)
+              .setState(ByteString.copyFrom(serializedState))
+              .build();
+
+      CheckpointManager.StoreInstanceStateCheckpoint storeRequest =
+          CheckpointManager.StoreInstanceStateCheckpoint.newBuilder()
+              .setState(instanceState)
+              .build();
+
+      // Put the checkpoint to out stream queue
+      outQueue.offer(storeRequest);
+    } finally {
+      lock.unlock();
+    }
   }
 
   public void addDataTuple(
       String streamId,
       HeronTuples.HeronDataTuple.Builder newTuple,
       long tupleSizeInBytes) {
-    if (tupleSizeInBytes > maxDataTupleSize.asBytes()) {
-      throw new RuntimeException(
-          String.format("Data tuple (stream id: %s) is too large: %d bytes", 
streamId,
-              tupleSizeInBytes));
+    lock.lock();
+    try {
+      if (tupleSizeInBytes > maxDataTupleSize.asBytes()) {
+        throw new RuntimeException(
+            String.format("Data tuple (stream id: %s) is too large: %d bytes", 
streamId,
+                tupleSizeInBytes));
+      }
+      if (currentDataTuple == null
+          || !currentDataTuple.getStream().getId().equals(streamId)
+          || currentDataTuple.getTuplesCount() >= dataTupleSetCapacity
+          || currentDataTupleSizeInBytes >= maxDataTupleSize.asBytes()) {
+        initNewDataTuple(streamId);
+      }
+      currentDataTuple.addTuples(newTuple);
+
+      currentDataTupleSizeInBytes += tupleSizeInBytes;
+      totalDataEmittedInBytes.getAndAdd(tupleSizeInBytes);
+    } finally {
+      lock.unlock();
     }
-    if (currentDataTuple == null
-        || !currentDataTuple.getStream().getId().equals(streamId)
-        || currentDataTuple.getTuplesCount() >= dataTupleSetCapacity
-        || currentDataTupleSizeInBytes >= maxDataTupleSize.asBytes()) {
-      initNewDataTuple(streamId);
-    }
-    currentDataTuple.addTuples(newTuple);
-
-    currentDataTupleSizeInBytes += tupleSizeInBytes;
-    totalDataEmittedInBytes += tupleSizeInBytes;
   }
 
-  public void addAckTuple(HeronTuples.AckTuple.Builder newTuple, long 
tupleSizeInBytes) {
-    if (currentControlTuple == null
-        || currentControlTuple.getFailsCount() > 0
-        || currentControlTuple.getAcksCount() >= controlTupleSetCapacity) {
-      initNewControlTuple();
+  public void addAckTuple(
+      HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) {
+    lock.lock();
+    try {
+      if (currentControlTuple == null
+          || currentControlTuple.getFailsCount() > 0
+          || currentControlTuple.getAcksCount() >= controlTupleSetCapacity) {
+        initNewControlTuple();
+      }
+      currentControlTuple.addAcks(newTuple);
+
+      // Add the size of data in bytes ready to send out
+      totalDataEmittedInBytes.getAndAdd(tupleSizeInBytes);
+    } finally {
+      lock.unlock();
     }
-    currentControlTuple.addAcks(newTuple);
-
-    // Add the size of data in bytes ready to send out
-    totalDataEmittedInBytes += tupleSizeInBytes;
   }
 
-  public void addFailTuple(HeronTuples.AckTuple.Builder newTuple, long 
tupleSizeInBytes) {
-    if (currentControlTuple == null
-        || currentControlTuple.getAcksCount() > 0
-        || currentControlTuple.getFailsCount() >= controlTupleSetCapacity) {
-      initNewControlTuple();
+  public void addFailTuple(
+      HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) {
+    lock.lock();
+    try {
+      if (currentControlTuple == null
+          || currentControlTuple.getAcksCount() > 0
+          || currentControlTuple.getFailsCount() >= controlTupleSetCapacity) {
+        initNewControlTuple();
+      }
+      currentControlTuple.addFails(newTuple);
+
+      // Add the size of data in bytes ready to send out
+      totalDataEmittedInBytes.getAndAdd(tupleSizeInBytes);
+    } finally {
+      lock.unlock();
     }
-    currentControlTuple.addFails(newTuple);
-
-    // Add the size of data in bytes ready to send out
-    totalDataEmittedInBytes += tupleSizeInBytes;
   }
 
   private void initNewDataTuple(String streamId) {
@@ -215,18 +248,28 @@ public class OutgoingTupleCollection {
   }
 
   public long getTotalDataEmittedInBytes() {
-    return totalDataEmittedInBytes;
+    return totalDataEmittedInBytes.get();
   }
 
   // Clean the internal state of OutgoingTupleCollection
   public void clear() {
-    currentControlTuple = null;
-    currentDataTuple = null;
+    lock.lock();
+    try {
+      currentControlTuple = null;
+      currentDataTuple = null;
 
-    outQueue.clear();
+      outQueue.clear();
+    } finally {
+      lock.unlock();
+    }
   }
 
   public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
-    this.helper = physicalPlanHelper;
+    lock.lock();
+    try {
+      this.helper = physicalPlanHelper;
+    } finally {
+      lock.unlock();
+    }
   }
 }
diff --git 
a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java 
b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
index 773d144..6286e15 100644
--- a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
@@ -144,12 +144,19 @@ public class BoltInstance implements IInstance {
       throw new RuntimeException("Could not save a non-stateful topology's 
state");
     }
 
-    // Checkpoint
-    if (bolt instanceof IStatefulComponent) {
-      ((IStatefulComponent) bolt).preSave(checkpointId);
-    }
+    // need to synchronize with other OutgoingTupleCollection operations
+    // so that topology emit, ack, fail are thread safe
+    collector.lock.lock();
+    try {
+      // Checkpoint
+      if (bolt instanceof IStatefulComponent) {
+        ((IStatefulComponent) bolt).preSave(checkpointId);
+      }
 
-    collector.sendOutState(instanceState, checkpointId);
+      collector.sendOutState(instanceState, checkpointId);
+    } finally {
+      collector.lock.unlock();
+    }
   }
 
   @SuppressWarnings("unchecked")
diff --git 
a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java 
b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
index cbb5e31..65d80ef 100644
--- a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
@@ -153,11 +153,18 @@ public class SpoutInstance implements IInstance {
       throw new RuntimeException("Could not save a non-stateful topology's 
state");
     }
 
-    if (spout instanceof IStatefulComponent) {
-      ((IStatefulComponent) spout).preSave(checkpointId);
-    }
+    // need to synchronize with other OutgoingTupleCollection operations
+    // so that topology emit, ack, fail are thread safe
+    collector.lock.lock();
+    try {
+      if (spout instanceof IStatefulComponent) {
+        ((IStatefulComponent) spout).preSave(checkpointId);
+      }
 
-    collector.sendOutState(instanceState, checkpointId);
+      collector.sendOutState(instanceState, checkpointId);
+    } finally {
+      collector.lock.unlock();
+    }
   }
 
   @SuppressWarnings("unchecked")

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to