This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d97b57c Fixed deadlock between create function and leader
initialization (#7508)
d97b57c is described below
commit d97b57c776e21103e7e32cb7167eb50ae0c2c677
Author: Matteo Merli <[email protected]>
AuthorDate: Sat Jul 11 08:37:04 2020 -0700
Fixed deadlock between create function and leader initialization (#7508)
---
.../functions/worker/FunctionMetaDataManager.java | 68 ++++++++++++----------
1 file changed, 37 insertions(+), 31 deletions(-)
diff --git
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index abe2b38..4ec1ef5 100644
---
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -199,45 +199,51 @@ public class FunctionMetaDataManager implements
AutoCloseable {
* @throws IllegalStateException if we are not the leader
* @throws IllegalArgumentException if the request is out of date.
*/
- public synchronized void updateFunctionOnLeader(FunctionMetaData
functionMetaData, boolean delete)
+ public void updateFunctionOnLeader(FunctionMetaData functionMetaData,
boolean delete)
throws IllegalStateException, IllegalArgumentException {
- if (exclusiveLeaderProducer == null) {
- throw new IllegalStateException("Not the leader");
- }
boolean needsScheduling;
- if (delete) {
- needsScheduling = proccessDeregister(functionMetaData);
- } else {
- needsScheduling = processUpdate(functionMetaData);
- }
- byte[] toWrite;
- if (workerConfig.getUseCompactedMetadataTopic()) {
+ synchronized (this) {
+ if (exclusiveLeaderProducer == null) {
+ throw new IllegalStateException("Not the leader");
+ }
+
if (delete) {
- toWrite = "".getBytes();
+ needsScheduling = proccessDeregister(functionMetaData);
} else {
- toWrite = functionMetaData.toByteArray();
+ needsScheduling = processUpdate(functionMetaData);
}
- } else {
- Request.ServiceRequest serviceRequest =
Request.ServiceRequest.newBuilder()
- .setServiceRequestType(delete ?
Request.ServiceRequest.ServiceRequestType.DELETE :
Request.ServiceRequest.ServiceRequestType.UPDATE)
- .setFunctionMetaData(functionMetaData)
- .setWorkerId(workerConfig.getWorkerId())
- .setRequestId(UUID.randomUUID().toString())
- .build();
- toWrite = serviceRequest.toByteArray();
- }
- try {
- TypedMessageBuilder builder = exclusiveLeaderProducer.newMessage()
- .value(toWrite)
- .property(versionTag,
Long.toString(functionMetaData.getVersion()));
+ byte[] toWrite;
if (workerConfig.getUseCompactedMetadataTopic()) {
- builder =
builder.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()));
+ if (delete) {
+ toWrite = "".getBytes();
+ } else {
+ toWrite = functionMetaData.toByteArray();
+ }
+ } else {
+ Request.ServiceRequest serviceRequest =
Request.ServiceRequest.newBuilder()
+ .setServiceRequestType(delete ?
Request.ServiceRequest.ServiceRequestType.DELETE
+ :
Request.ServiceRequest.ServiceRequestType.UPDATE)
+ .setFunctionMetaData(functionMetaData)
+ .setWorkerId(workerConfig.getWorkerId())
+ .setRequestId(UUID.randomUUID().toString())
+ .build();
+ toWrite = serviceRequest.toByteArray();
}
- lastMessageSeen = builder.send();
- } catch (Exception e) {
- log.error("Could not write into Function Metadata topic", e);
- throw new IllegalStateException("Internal Error updating function
at the leader", e);
+ try {
+ TypedMessageBuilder builder =
exclusiveLeaderProducer.newMessage()
+ .value(toWrite)
+ .property(versionTag,
Long.toString(functionMetaData.getVersion()));
+ if (workerConfig.getUseCompactedMetadataTopic()) {
+ builder =
builder.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails()));
+ }
+ lastMessageSeen = builder.send();
+ } catch (Exception e) {
+ log.error("Could not write into Function Metadata topic", e);
+ throw new IllegalStateException("Internal Error updating
function at the leader", e);
+ }
+
}
+
if (needsScheduling) {
this.schedulerManager.schedule();
}