This is an automated email from the ASF dual-hosted git repository.
nlu90 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 498140c Making emit, ack, and fail thread safe (#2692)
498140c is described below
commit 498140c2674fcdf73f6ce13209266f5e35d50af6
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Tue May 22 15:21:56 2018 -0700
Making emit, ack, and fail thread safe (#2692)
* Making emit, ack, and fail thread safe
* fixing checkstyle
* refactoring and adding synchronization to state save
---
.../heron/instance/AbstractOutputCollector.java | 4 +-
.../heron/instance/OutgoingTupleCollection.java | 165 +++++++++++++--------
.../apache/heron/instance/bolt/BoltInstance.java | 17 ++-
.../apache/heron/instance/spout/SpoutInstance.java | 15 +-
4 files changed, 130 insertions(+), 71 deletions(-)
diff --git
a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
index aac6443..8d5f1fa 100644
---
a/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
+++
b/heron/instance/src/java/org/apache/heron/instance/AbstractOutputCollector.java
@@ -23,6 +23,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
@@ -46,6 +47,7 @@ public class AbstractOutputCollector {
private long totalTuplesEmitted;
private long totalBytesEmitted;
private PhysicalPlanHelper helper;
+ public final ReentrantLock lock = new ReentrantLock();
/**
* The SuppressWarnings is only until TOPOLOGY_ENABLE_ACKING exists.
@@ -79,7 +81,7 @@ public class AbstractOutputCollector {
}
}
- this.outputter = new OutgoingTupleCollection(helper, streamOutQueue);
+ this.outputter = new OutgoingTupleCollection(helper, streamOutQueue, lock);
}
public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
diff --git
a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
index 8775a0c..7c28290 100644
---
a/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
+++
b/heron/instance/src/java/org/apache/heron/instance/OutgoingTupleCollection.java
@@ -20,6 +20,8 @@
package org.apache.heron.instance;
import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
@@ -60,14 +62,17 @@ public class OutgoingTupleCollection {
private HeronTuples.HeronControlTupleSet.Builder currentControlTuple;
// Total data emitted in bytes for the entire life
- private long totalDataEmittedInBytes;
+ private AtomicLong totalDataEmittedInBytes = new AtomicLong();
// Current size in bytes for data types to pack into the HeronTupleSet
private long currentDataTupleSizeInBytes;
+ private final ReentrantLock lock;
+
public OutgoingTupleCollection(
PhysicalPlanHelper helper,
- Communicator<Message> outQueue) {
+ Communicator<Message> outQueue,
+ ReentrantLock lock) {
this.outQueue = outQueue;
this.helper = helper;
SystemConfig systemConfig =
@@ -77,17 +82,23 @@ public class OutgoingTupleCollection {
SerializeDeSerializeHelper.getSerializer(helper.getTopologyContext().getTopologyConfig());
// Initialize the values in constructor
- this.totalDataEmittedInBytes = 0;
+ this.totalDataEmittedInBytes.set(0);
this.currentDataTupleSizeInBytes = 0;
// Read the config values
this.dataTupleSetCapacity = systemConfig.getInstanceSetDataTupleCapacity();
this.maxDataTupleSize = systemConfig.getInstanceSetDataTupleSize();
this.controlTupleSetCapacity =
systemConfig.getInstanceSetControlTupleCapacity();
+ this.lock = lock;
}
public void sendOutTuples() {
- flushRemaining();
+ lock.lock();
+ try {
+ flushRemaining();
+ } finally {
+ lock.unlock();
+ }
}
/**
@@ -97,71 +108,93 @@ public class OutgoingTupleCollection {
*/
public void sendOutState(State<Serializable, Serializable> state,
String checkpointId) {
- // flush all the current data before sending the state
- flushRemaining();
-
- // Serialize the state
- byte[] serializedState = serializer.serialize(state);
-
- // Construct the instance state checkpoint
- CheckpointManager.InstanceStateCheckpoint instanceState =
- CheckpointManager.InstanceStateCheckpoint.newBuilder()
- .setCheckpointId(checkpointId)
- .setState(ByteString.copyFrom(serializedState))
- .build();
-
- CheckpointManager.StoreInstanceStateCheckpoint storeRequest =
- CheckpointManager.StoreInstanceStateCheckpoint.newBuilder()
- .setState(instanceState)
- .build();
-
- // Put the checkpoint to out stream queue
- outQueue.offer(storeRequest);
+ lock.lock();
+ try {
+ // flush all the current data before sending the state
+ flushRemaining();
+
+ // Serialize the state
+ byte[] serializedState = serializer.serialize(state);
+
+ // Construct the instance state checkpoint
+ CheckpointManager.InstanceStateCheckpoint instanceState =
+ CheckpointManager.InstanceStateCheckpoint.newBuilder()
+ .setCheckpointId(checkpointId)
+ .setState(ByteString.copyFrom(serializedState))
+ .build();
+
+ CheckpointManager.StoreInstanceStateCheckpoint storeRequest =
+ CheckpointManager.StoreInstanceStateCheckpoint.newBuilder()
+ .setState(instanceState)
+ .build();
+
+ // Put the checkpoint to out stream queue
+ outQueue.offer(storeRequest);
+ } finally {
+ lock.unlock();
+ }
}
public void addDataTuple(
String streamId,
HeronTuples.HeronDataTuple.Builder newTuple,
long tupleSizeInBytes) {
- if (tupleSizeInBytes > maxDataTupleSize.asBytes()) {
- throw new RuntimeException(
- String.format("Data tuple (stream id: %s) is too large: %d bytes",
streamId,
- tupleSizeInBytes));
+ lock.lock();
+ try {
+ if (tupleSizeInBytes > maxDataTupleSize.asBytes()) {
+ throw new RuntimeException(
+ String.format("Data tuple (stream id: %s) is too large: %d bytes",
streamId,
+ tupleSizeInBytes));
+ }
+ if (currentDataTuple == null
+ || !currentDataTuple.getStream().getId().equals(streamId)
+ || currentDataTuple.getTuplesCount() >= dataTupleSetCapacity
+ || currentDataTupleSizeInBytes >= maxDataTupleSize.asBytes()) {
+ initNewDataTuple(streamId);
+ }
+ currentDataTuple.addTuples(newTuple);
+
+ currentDataTupleSizeInBytes += tupleSizeInBytes;
+ totalDataEmittedInBytes.getAndAdd(tupleSizeInBytes);
+ } finally {
+ lock.unlock();
}
- if (currentDataTuple == null
- || !currentDataTuple.getStream().getId().equals(streamId)
- || currentDataTuple.getTuplesCount() >= dataTupleSetCapacity
- || currentDataTupleSizeInBytes >= maxDataTupleSize.asBytes()) {
- initNewDataTuple(streamId);
- }
- currentDataTuple.addTuples(newTuple);
-
- currentDataTupleSizeInBytes += tupleSizeInBytes;
- totalDataEmittedInBytes += tupleSizeInBytes;
}
- public void addAckTuple(HeronTuples.AckTuple.Builder newTuple, long
tupleSizeInBytes) {
- if (currentControlTuple == null
- || currentControlTuple.getFailsCount() > 0
- || currentControlTuple.getAcksCount() >= controlTupleSetCapacity) {
- initNewControlTuple();
+ public void addAckTuple(
+ HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) {
+ lock.lock();
+ try {
+ if (currentControlTuple == null
+ || currentControlTuple.getFailsCount() > 0
+ || currentControlTuple.getAcksCount() >= controlTupleSetCapacity) {
+ initNewControlTuple();
+ }
+ currentControlTuple.addAcks(newTuple);
+
+ // Add the size of data in bytes ready to send out
+ totalDataEmittedInBytes.getAndAdd(tupleSizeInBytes);
+ } finally {
+ lock.unlock();
}
- currentControlTuple.addAcks(newTuple);
-
- // Add the size of data in bytes ready to send out
- totalDataEmittedInBytes += tupleSizeInBytes;
}
- public void addFailTuple(HeronTuples.AckTuple.Builder newTuple, long
tupleSizeInBytes) {
- if (currentControlTuple == null
- || currentControlTuple.getAcksCount() > 0
- || currentControlTuple.getFailsCount() >= controlTupleSetCapacity) {
- initNewControlTuple();
+ public void addFailTuple(
+ HeronTuples.AckTuple.Builder newTuple, long tupleSizeInBytes) {
+ lock.lock();
+ try {
+ if (currentControlTuple == null
+ || currentControlTuple.getAcksCount() > 0
+ || currentControlTuple.getFailsCount() >= controlTupleSetCapacity) {
+ initNewControlTuple();
+ }
+ currentControlTuple.addFails(newTuple);
+
+ // Add the size of data in bytes ready to send out
+ totalDataEmittedInBytes.getAndAdd(tupleSizeInBytes);
+ } finally {
+ lock.unlock();
}
- currentControlTuple.addFails(newTuple);
-
- // Add the size of data in bytes ready to send out
- totalDataEmittedInBytes += tupleSizeInBytes;
}
private void initNewDataTuple(String streamId) {
@@ -215,18 +248,28 @@ public class OutgoingTupleCollection {
}
public long getTotalDataEmittedInBytes() {
- return totalDataEmittedInBytes;
+ return totalDataEmittedInBytes.get();
}
// Clean the internal state of OutgoingTupleCollection
public void clear() {
- currentControlTuple = null;
- currentDataTuple = null;
+ lock.lock();
+ try {
+ currentControlTuple = null;
+ currentDataTuple = null;
- outQueue.clear();
+ outQueue.clear();
+ } finally {
+ lock.unlock();
+ }
}
public void updatePhysicalPlanHelper(PhysicalPlanHelper physicalPlanHelper) {
- this.helper = physicalPlanHelper;
+ lock.lock();
+ try {
+ this.helper = physicalPlanHelper;
+ } finally {
+ lock.unlock();
+ }
}
}
diff --git
a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
index 773d144..6286e15 100644
--- a/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/bolt/BoltInstance.java
@@ -144,12 +144,19 @@ public class BoltInstance implements IInstance {
throw new RuntimeException("Could not save a non-stateful topology's
state");
}
- // Checkpoint
- if (bolt instanceof IStatefulComponent) {
- ((IStatefulComponent) bolt).preSave(checkpointId);
- }
+ // need to synchronize with other OutgoingTupleCollection operations
+ // so that topology emit, ack, fail are thread safe
+ collector.lock.lock();
+ try {
+ // Checkpoint
+ if (bolt instanceof IStatefulComponent) {
+ ((IStatefulComponent) bolt).preSave(checkpointId);
+ }
- collector.sendOutState(instanceState, checkpointId);
+ collector.sendOutState(instanceState, checkpointId);
+ } finally {
+ collector.lock.unlock();
+ }
}
@SuppressWarnings("unchecked")
diff --git
a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
index cbb5e31..65d80ef 100644
--- a/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
+++ b/heron/instance/src/java/org/apache/heron/instance/spout/SpoutInstance.java
@@ -153,11 +153,18 @@ public class SpoutInstance implements IInstance {
throw new RuntimeException("Could not save a non-stateful topology's
state");
}
- if (spout instanceof IStatefulComponent) {
- ((IStatefulComponent) spout).preSave(checkpointId);
- }
+ // need to synchronize with other OutgoingTupleCollection operations
+ // so that topology emit, ack, fail are thread safe
+ collector.lock.lock();
+ try {
+ if (spout instanceof IStatefulComponent) {
+ ((IStatefulComponent) spout).preSave(checkpointId);
+ }
- collector.sendOutState(instanceState, checkpointId);
+ collector.sendOutState(instanceState, checkpointId);
+ } finally {
+ collector.lock.unlock();
+ }
}
@SuppressWarnings("unchecked")
--
To stop receiving notification emails like this one, please contact
[email protected].