merlimat closed pull request #2438: Fix: Function assignment can support large number of topics URL: https://github.com/apache/incubator-pulsar/pull/2438
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 444b7fb9f6..497c414bc3 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -27,6 +27,7 @@ connectorsDirectory: ./connectors functionMetadataTopicName: metadata clusterCoordinationTopicName: coordinate pulsarFunctionsNamespace: public/functions +pulsarAssignmentNamespace: public/assignment pulsarFunctionsCluster: standalone pulsarServiceUrl: pulsar://localhost:6650 pulsarWebServiceUrl: http://localhost:8080 diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index dd39222248..54c14c05b4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -105,6 +105,7 @@ WorkerService functionsWorkerService; final String tenant = "external-repl-prop"; String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin"; + String pulsarAssignmentNamespace = tenant + "/use/pulsar-assignment"; String primaryHost; String workerId; @@ -212,6 +213,7 @@ void shutdown() throws Exception { private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) { workerConfig = new WorkerConfig(); workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace); + workerConfig.setPulsarAssignmentNamespace(pulsarAssignmentNamespace); workerConfig.setSchedulerClassName( org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName()); workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use")); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java index 366eaba6a0..060120b10f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java @@ -20,11 +20,15 @@ import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.functions.proto.Request; +import com.google.common.collect.Lists; + import java.io.IOException; +import java.util.List; import java.util.function.Function; @Slf4j @@ -33,13 +37,22 @@ private final FunctionRuntimeManager functionRuntimeManager; private final Reader<byte[]> reader; + + private long currentVersion = 0; + private final List<Request.AssignmentsUpdate> currentVersionAssignments; + private volatile MessageId previousOldAssignmentMsgId = null; public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) throws PulsarClientException { - this.functionRuntimeManager = functionRuntimeManager; - this.reader = reader; + this.functionRuntimeManager = functionRuntimeManager; + this.reader = reader; + this.currentVersionAssignments = Lists.newArrayList(); + // complete init if reader has no message to read so, scheduled-manager can schedule assignments + if (!hasMessageAvailable()) { + this.functionRuntimeManager.initialized = true; } + } public void start() { @@ -66,29 +79,40 @@ public void close() { @Override public void accept(Message<byte[]> msg) { - // check if latest - boolean hasMessageAvailable; + Request.AssignmentsUpdate assignmentsUpdate; try { - hasMessageAvailable = this.reader.hasMessageAvailable(); - } catch (PulsarClientException e) { + assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(msg.getData()); + } catch (IOException e) { + log.error("[{}] Received bad assignment update at message {}", reader.getTopic(), msg.getMessageId(), + e); + // TODO: find a better way to handle bad request throw new RuntimeException(e); } - if (!hasMessageAvailable) { - Request.AssignmentsUpdate assignmentsUpdate; - try { - assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(msg.getData()); - } catch (IOException e) { - log.error("[{}] Received bad assignment update at message {}", reader.getTopic(), msg.getMessageId(), - e); - // TODO: find a better way to handle bad request - throw new RuntimeException(e); - } - if (log.isDebugEnabled()) { - log.debug("Received assignment update: {}", assignmentsUpdate); + if (log.isDebugEnabled()) { + log.debug("Received assignment update: {}", assignmentsUpdate); + } + + // clear previous version assignments and ack all previous messages + if (currentVersion < assignmentsUpdate.getVersion()) { + currentVersionAssignments.clear(); + // ack the outdated version to avoid processing again + if (previousOldAssignmentMsgId != null && this.functionRuntimeManager.isActiveRuntimeConsumer.get()) { + this.functionRuntimeManager.assignmentConsumer.acknowledgeCumulativeAsync(previousOldAssignmentMsgId); } + } - this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(), assignmentsUpdate); + currentVersionAssignments.add(assignmentsUpdate); + + // process only if the latest message + if (!hasMessageAvailable()) { + this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(), currentVersionAssignments); + // function-runtime manager has processed all assignments in the topic at least once.. so scheduled-manager + // can only publish any new assignment with latest processed version + this.functionRuntimeManager.initialized = true; } + + currentVersion = assignmentsUpdate.getVersion(); + previousOldAssignmentMsgId = msg.getMessageId(); // receive next request receiveOne(); } @@ -99,4 +123,12 @@ public Void apply(Throwable cause) { // TODO: find a better way to handle consumer functions throw new RuntimeException(cause); } + + private boolean hasMessageAvailable() { + try { + return this.reader.hasMessageAvailable(); + } catch (PulsarClientException e) { + throw new RuntimeException(e); + } + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 93828de40d..452b51c52b 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -26,10 +26,16 @@ import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.policies.data.ErrorData; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.proto.InstanceCommunication; @@ -38,6 +44,7 @@ import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory; import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; +import org.inferred.freebuilder.shaded.org.apache.commons.lang3.StringUtils; import org.apache.pulsar.functions.runtime.RuntimeSpawner; import javax.ws.rs.WebApplicationException; @@ -58,6 +65,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** @@ -94,6 +102,11 @@ private final ConnectorsManager connectorsManager; private final PulsarAdmin functionAdmin; + + private static final String ASSIGNMENT_TOPIC_SUBSCRIPTION = "pulsar.functions"; + protected AtomicBoolean isActiveRuntimeConsumer = new AtomicBoolean(false); + protected Consumer<byte[]> assignmentConsumer; + protected volatile boolean initialized = false; public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace, MembershipManager membershipManager, ConnectorsManager connectorsManager) throws Exception { @@ -103,7 +116,7 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer Reader<byte[]> reader = workerService.getClient().newReader() .topic(this.workerConfig.getFunctionAssignmentTopic()) - .startMessageId(MessageId.earliest) + .startMessageId(getLatestAssignmentMsgId(workerConfig, workerService)) .create(); this.functionAssignmentTailer = new FunctionAssignmentTailer(this, reader); @@ -132,6 +145,20 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer } else { throw new RuntimeException("Either Thread or Process Container Factory need to be set"); } + + this.assignmentConsumer = workerService.getClient().newConsumer() + .topic(this.workerConfig.getFunctionAssignmentTopic()).subscriptionName(ASSIGNMENT_TOPIC_SUBSCRIPTION) + .subscriptionType(SubscriptionType.Failover).consumerEventListener(new ConsumerEventListener() { + private static final long serialVersionUID = 1L; + + public void becameActive(Consumer<?> consumer, int partitionId) { + isActiveRuntimeConsumer.set(true); + } + + public void becameInactive(Consumer<?> consumer, int partitionId) { + isActiveRuntimeConsumer.set(false); + } + }).subscribe(); this.actionQueue = new LinkedBlockingQueue<>(); @@ -141,6 +168,29 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer this.membershipManager = membershipManager; } + private MessageId getLatestAssignmentMsgId(WorkerConfig workerConfig, WorkerService workerService) { + try { + PulsarAdmin admin = workerService.getBrokerAdmin(); + PersistentTopicInternalStats topicStats = admin.topics() + .getInternalStats(workerConfig.getFunctionAssignmentTopic()); + if (topicStats != null && topicStats.cursors != null) { + CursorStats cursor = topicStats.cursors.get(ASSIGNMENT_TOPIC_SUBSCRIPTION); + if (cursor != null && StringUtils.isNotBlank(cursor.markDeletePosition)) { + String[] ids = cursor.markDeletePosition.split(":"); + if (ids.length == 2) { + MessageIdImpl msgId = new MessageIdImpl(Long.parseLong(ids[0]), Long.parseLong(ids[1]), -1); + log.info("Assignment-reader starts reading from {}", msgId); + return msgId; + } + } + } + } catch (Exception e) { + log.warn("Failed to get assignment-msg id for runtime-manager for {}-{}", + workerConfig.getFunctionAssignmentTopic(), ASSIGNMENT_TOPIC_SUBSCRIPTION, e); + } + return MessageId.earliest; + } + /** * Starts the function runtime manager */ @@ -450,6 +500,12 @@ private void stopFunction(String fullyQualifiedInstanceId, boolean restart) thro return functionStatusListBuilder.build(); } + public synchronized void processAssignmentUpdate(MessageId messageId, List<AssignmentsUpdate> assignmentsUpdates) { + assignmentsUpdates.forEach(assignmentsUpdate -> { + processAssignmentUpdate(messageId, assignmentsUpdate); + }); + } + /** * Process an assignment update from the assignment topic * @param messageId the message id of the update assignment @@ -548,9 +604,10 @@ public synchronized void processAssignmentUpdate(MessageId messageId, Assignment // set as current assignment this.currentAssignmentVersion = assignmentsUpdate.getVersion(); - } else { - log.debug("Received out of date assignment update: {}", assignmentsUpdate); + if (log.isDebugEnabled()) { + log.debug("Received out of date assignment update: {}", assignmentsUpdate); + } } } @@ -689,4 +746,8 @@ public void close() throws Exception { private FunctionRuntimeInfo getFunctionRuntimeInfo(String fullyQualifiedInstanceId) { return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); } + + public boolean isInitialized() { + return initialized; + } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java index b18fd12881..509c9251a5 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -43,6 +44,7 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.common.policies.data.ConsumerStats; +import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index ed00958d7e..40ab1324d0 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -47,9 +47,13 @@ import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.worker.scheduler.IScheduler; +import com.google.common.collect.Iterables; + @Slf4j public class SchedulerManager implements AutoCloseable { + private static final int MAX_ASSIGNMENTS_IN_MSG = 2000; + private final WorkerConfig workerConfig; @Setter @@ -99,6 +103,14 @@ public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient) { } private void invokeScheduler() { + + // publish new assignment only once function-runtime manager updates previously updated version so, + // scheduled-manager can publish new assignment with updated version at FunctionRuntimeManager + if (!functionRuntimeManager.isInitialized()) { + schedule(); + return; + } + List<String> currentMembership = this.membershipManager.getCurrentMembership() .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toList()); @@ -143,21 +155,12 @@ private void invokeScheduler() { List<Assignment> assignments = this.scheduler.schedule( needsAssignment, currentAssignments, currentMembership); - log.debug("New assignments computed: {}", assignments); + if (log.isDebugEnabled()) { + log.debug("New assignments computed: {}", assignments); + } long assignmentVersion = this.functionRuntimeManager.getCurrentAssignmentVersion() + 1; - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder() - .setVersion(assignmentVersion) - .addAllAssignments(assignments) - .build(); - - CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(assignmentsUpdate.toByteArray()); - try { - messageIdCompletableFuture.get(); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to send assignment update", e); - throw new RuntimeException(e); - } + publishAssignmentUpdate(assignmentVersion, assignments); // wait for assignment update to go throw the pipeline int retries = 0; @@ -176,6 +179,23 @@ private void invokeScheduler() { } } + private void publishAssignmentUpdate(long assignmentVersion, List<Assignment> assignments) { + + Iterable<List<Assignment>> batches = Iterables.partition(assignments, MAX_ASSIGNMENTS_IN_MSG); + batches.forEach(assignmentBatch -> { + Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder() + .setVersion(assignmentVersion).addAllAssignments(assignmentBatch).build(); + CompletableFuture<MessageId> messageIdCompletableFuture = producer + .sendAsync(assignmentsUpdate.toByteArray()); + try { + messageIdCompletableFuture.get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Failed to send assignment update", e); + throw new RuntimeException(e); + } + }); + } + public static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions) { Map<String, Function.Instance> functionInstances = new HashMap<>(); for (FunctionMetaData functionMetaData : allFunctions) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index cb73eaa16f..4d6ba5f7dc 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -97,32 +97,10 @@ private static URI initialize(WorkerConfig workerConfig) // getting namespace policy log.info("Initializing Pulsar Functions namespace..."); try { - try { - admin.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace()); - } catch (PulsarAdminException e) { - if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) { - // if not found than create - try { - Policies policies = new Policies(); - policies.retention_policies = new RetentionPolicies(-1, -1); - policies.replication_clusters = new HashSet<>(); - policies.replication_clusters.add(workerConfig.getPulsarFunctionsCluster()); - admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace(), - policies); - } catch (PulsarAdminException e1) { - // prevent race condition with other workers starting up - if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) { - log.error("Failed to create namespace {} for pulsar functions", workerConfig - .getPulsarFunctionsNamespace(), e1); - throw e1; - } - } - } else { - log.error("Failed to get retention policy for pulsar function namespace {}", - workerConfig.getPulsarFunctionsNamespace(), e); - throw e; - } - } + createNamespace(admin, workerConfig.getPulsarFunctionsCluster(), workerConfig.getPulsarFunctionsNamespace(), + true); + createNamespace(admin, workerConfig.getPulsarFunctionsCluster(), + workerConfig.getPulsarAssignmentNamespace(), false); try { internalConf = admin.brokers().getInternalConfigurationData(); } catch (PulsarAdminException e) { @@ -146,6 +124,35 @@ private static URI initialize(WorkerConfig workerConfig) } } + private static void createNamespace(PulsarAdmin admin, String cluster, String namespace, boolean infiniteRetention) + throws PulsarAdminException { + try { + admin.namespaces().getPolicies(namespace); + } catch (PulsarAdminException e) { + if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) { + // if not found than create + try { + Policies policies = new Policies(); + if (infiniteRetention) { + policies.retention_policies = new RetentionPolicies(-1, -1); + } + policies.replication_clusters = new HashSet<>(); + policies.replication_clusters.add(cluster); + admin.namespaces().createNamespace(namespace, policies); + } catch (PulsarAdminException e1) { + // prevent race condition with other workers starting up + if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) { + log.error("Failed to create namespace {} for pulsar functions", namespace, e1); + throw e1; + } + } + } else { + log.error("Failed to get retention policy for pulsar function namespace {}", namespace, e); + throw e; + } + } + } + @Override protected void doStop() { if (null != this.server) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 0f695a974f..74ccd7c7dd 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -61,6 +61,7 @@ private String pulsarWebServiceUrl; private String clusterCoordinationTopicName; private String pulsarFunctionsNamespace; + private String pulsarAssignmentNamespace; private String pulsarFunctionsCluster; private int numFunctionPackageReplicas; private String downloadDirectory; @@ -133,7 +134,7 @@ public String getClusterCoordinationTopic() { } public String getFunctionAssignmentTopic() { - return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionAssignmentTopicName); + return String.format("persistent://%s/%s", pulsarAssignmentNamespace, functionAssignmentTopicName); } public static WorkerConfig load(String yamlFile) throws IOException { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index 85a2122dd0..c19d208511 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -20,10 +20,16 @@ import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.ConsumerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.Request; @@ -45,6 +51,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class FunctionRuntimeManagerTest { @@ -81,7 +88,7 @@ public void testProcessAssignmentUpdateAddFunctions() throws Exception { workerConfig.setStateStorageServiceUrl("foo"); workerConfig.setFunctionAssignmentTopicName("assignments"); - PulsarClient pulsarClient = mock(PulsarClient.class); + PulsarClient pulsarClient = mockPulsarClient(); ReaderBuilder readerBuilder = mock(ReaderBuilder.class); doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); @@ -178,7 +185,7 @@ public void testProcessAssignmentUpdateDeleteFunctions() throws Exception { workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); - PulsarClient pulsarClient = mock(PulsarClient.class); + PulsarClient pulsarClient = mockPulsarClient(); ReaderBuilder readerBuilder = mock(ReaderBuilder.class); doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); @@ -279,7 +286,7 @@ public void testProcessAssignmentUpdateModifyFunctions() throws Exception { workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); - PulsarClient pulsarClient = mock(PulsarClient.class); + PulsarClient pulsarClient = mockPulsarClient(); ReaderBuilder readerBuilder = mock(ReaderBuilder.class); doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); @@ -394,4 +401,24 @@ public boolean matches(Object o) { Assert.assertEquals(functionRuntimeManager.workerIdToAssignments .get("worker-1").get("test-tenant/test-namespace/func-2:0"), assignment3); } + + private static PulsarClient mockPulsarClient() throws PulsarClientException { + PulsarClientImpl mockClient = mock(PulsarClientImpl.class); + + ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class); + ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class); + + when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder); + when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder); + when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder); + when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder); + + when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer); + + when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder); + + when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder); + + return mockClient; + } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index 2753bf196a..26e0540e4c 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -76,7 +76,6 @@ public void testConsumerEventListener() throws Exception { when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder); when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder); when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder); - when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer); WorkerService workerService = mock(WorkerService.class); doReturn(workerConfig).when(workerService).getWorkerConfig(); @@ -120,7 +119,7 @@ private static PulsarClient mockPulsarClient() throws PulsarClientException { when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder); when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder); - + return mockClient; } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index 19977bd812..60341240d1 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -111,6 +111,7 @@ public void setup() throws PulsarClientException { schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient)); functionRuntimeManager = mock(FunctionRuntimeManager.class); + when(functionRuntimeManager.isInitialized()).thenReturn(true); functionMetaDataManager = mock(FunctionMetaDataManager.class); membershipManager = mock(MembershipManager.class); schedulerManager.setFunctionMetaDataManager(functionMetaDataManager); @@ -145,6 +146,7 @@ public void testSchedule() throws Exception { //set version doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); + doReturn(true).when(functionRuntimeManager).isInitialized(); // single node List<WorkerInfo> workerInfoList = new LinkedList<>(); @@ -186,6 +188,7 @@ public void testNothingNewToSchedule() throws Exception { assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); + doReturn(true).when(functionRuntimeManager).isInitialized(); //set version doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); @@ -242,6 +245,7 @@ public void testAddingFunctions() throws Exception { assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); + doReturn(true).when(functionRuntimeManager).isInitialized(); //set version doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); @@ -313,6 +317,7 @@ public void testDeletingFunctions() throws Exception { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); + doReturn(true).when(functionRuntimeManager).isInitialized(); //set version doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); @@ -372,6 +377,7 @@ public void testScalingUp() throws Exception { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); + doReturn(true).when(functionRuntimeManager).isInitialized(); //set version doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); @@ -478,6 +484,7 @@ public void testScalingDown() throws Exception { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); + doReturn(true).when(functionRuntimeManager).isInitialized(); //set version doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); @@ -646,6 +653,7 @@ public void testUpdate() throws Exception { //set version doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); + doReturn(true).when(functionRuntimeManager).isInitialized(); // single node List<WorkerInfo> workerInfoList = new LinkedList<>(); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
