This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 9f88b8ac02b8f65db5166ecf532d51912190bc55 Author: feynmanlin <[email protected]> AuthorDate: Sat Jul 24 04:48:02 2021 +0800 Reduce the probability of cache inconsistencies (#11423) Now when updating the function metadata, the cache is updated first and then send message to the topic. There may be a situation where the local cache updated successfully but the message sending fails. Send the message first, then update the local cache (cherry picked from commit 5819242e2240deb834acc865ee0e9b79821992b1) --- .../functions/worker/FunctionMetaDataManager.java | 33 +++++++++-- .../worker/FunctionMetaDataManagerTest.java | 67 ++++++++++++++++++++-- 2 files changed, 90 insertions(+), 10 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java index b5ed5eb..1ff1b2b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java @@ -28,7 +28,6 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; - import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Message; @@ -212,12 +211,9 @@ public class FunctionMetaDataManager implements AutoCloseable { if (exclusiveLeaderProducer == null) { throw new IllegalStateException("Not the leader"); } + // Check first to avoid local cache update failure + checkRequestOutDated(functionMetaData, delete); - if (delete) { - needsScheduling = proccessDeregister(functionMetaData); - } else { - needsScheduling = processUpdate(functionMetaData); - } byte[] toWrite; if (workerConfig.getUseCompactedMetadataTopic()) { if (delete) { @@ -243,6 +239,11 @@ public class FunctionMetaDataManager implements AutoCloseable { builder = builder.key(FunctionCommon.getFullyQualifiedName(functionMetaData.getFunctionDetails())); } lastMessageSeen = builder.send(); + if (delete) { + needsScheduling = proccessDeregister(functionMetaData); + } else { + needsScheduling = processUpdate(functionMetaData); + } } catch (Exception e) { log.error("Could not write into Function Metadata topic", e); throw new IllegalStateException("Internal Error updating function at the leader", e); @@ -253,6 +254,22 @@ public class FunctionMetaDataManager implements AutoCloseable { } } + private void checkRequestOutDated(FunctionMetaData functionMetaData, boolean delete) { + Function.FunctionDetails details = functionMetaData.getFunctionDetails(); + if (isRequestOutdated(details.getTenant(), details.getNamespace(), + details.getName(), functionMetaData.getVersion())) { + if (log.isDebugEnabled()) { + log.debug("{}/{}/{} Ignoring outdated request version: {}", details.getTenant(), details.getNamespace(), + details.getName(), functionMetaData.getVersion()); + } + if (delete) { + throw new IllegalArgumentException( + "Delete request ignored because it is out of date. Please try again."); + } + throw new IllegalArgumentException("Update request ignored because it is out of date. Please try again."); + } + } + /** * Acquires a exclusive producer. This method cannot return null. It can only return a valid exclusive producer * or throw NotLeaderAnymore exception. @@ -455,6 +472,10 @@ public class FunctionMetaDataManager implements AutoCloseable { } private boolean isRequestOutdated(String tenant, String namespace, String functionName, long version) { + // avoid NPE + if(!containsFunctionMetaData(tenant, namespace, functionName)){ + return false; + } FunctionMetaData currentFunctionMetaData = this.functionMetaDataMap.get(tenant) .get(namespace).get(functionName); return currentFunctionMetaData.getVersion() >= version; diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java index 55f4222..3fef9f3 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java @@ -20,16 +20,31 @@ package org.apache.pulsar.functions.worker; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.*; - +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.AssertJUnit.assertEquals; +import static org.testng.AssertJUnit.assertTrue; +import static org.testng.AssertJUnit.fail; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; - -import org.apache.pulsar.client.api.*; +import org.apache.pulsar.client.api.CompressionType; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.ProducerBuilder; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Request; import org.testng.Assert; @@ -108,6 +123,50 @@ public class FunctionMetaDataManagerTest { } @Test + public void testSendMsgFailWithCompaction() throws Exception { + testSendMsgFail(true); + } + + @Test + public void testSendMsgFailWithoutCompaction() throws Exception { + testSendMsgFail(false); + } + + private void testSendMsgFail(boolean compact) throws Exception { + WorkerConfig workerConfig = new WorkerConfig(); + workerConfig.setWorkerId("worker-1"); + workerConfig.setUseCompactedMetadataTopic(compact); + FunctionMetaDataManager functionMetaDataManager = spy( + new FunctionMetaDataManager(workerConfig, + mock(SchedulerManager.class), + mockPulsarClient(), ErrorNotifier.getDefaultImpl())); + Function.FunctionMetaData m1 = Function.FunctionMetaData.newBuilder() + .setVersion(1) + .setFunctionDetails(Function.FunctionDetails.newBuilder().setName("func-1")).build(); + + // become leader + Producer<byte[]> exclusiveProducer = spy(functionMetaDataManager.acquireExclusiveWrite(() -> true)); + // make sure send msg fail + functionMetaDataManager.acquireLeadership(exclusiveProducer); + exclusiveProducer.close(); + when(exclusiveProducer.newMessage()).thenThrow(new RuntimeException("should failed")); + try { + functionMetaDataManager.updateFunctionOnLeader(m1, false); + fail("should failed"); + } catch (Exception e) { + assertTrue(e.getCause().getMessage().contains("should failed")); + } + assertEquals(functionMetaDataManager.getAllFunctionMetaData().size(), 0); + try { + functionMetaDataManager.updateFunctionOnLeader(m1, true); + fail("should failed"); + } catch (Exception e) { + assertTrue(e.getCause().getMessage().contains("should failed")); + } + assertEquals(functionMetaDataManager.getAllFunctionMetaData().size(), 0); + } + + @Test public void testUpdateIfLeaderFunctionWithoutCompaction() throws Exception { testUpdateIfLeaderFunction(false); }
