This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new ce74b11 Revert "Fix: Function assignment can support large number of topics (#2438)" (#2474) ce74b11 is described below commit ce74b11b563462d18ae0068cf2d9d9ef0271165a Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Thu Aug 30 22:44:21 2018 -0700 Revert "Fix: Function assignment can support large number of topics (#2438)" (#2474) This reverts commit b283ebd3e51aa86f6b9f0a6607f430996f0032f7. --- conf/functions_worker.yml | 1 - .../org/apache/pulsar/io/PulsarSinkE2ETest.java | 2 - .../functions/worker/FunctionAssignmentTailer.java | 70 ++++++---------------- .../functions/worker/FunctionRuntimeManager.java | 67 +-------------------- .../pulsar/functions/worker/MembershipManager.java | 2 - .../pulsar/functions/worker/SchedulerManager.java | 46 ++++---------- .../org/apache/pulsar/functions/worker/Worker.java | 59 ++++++++---------- .../pulsar/functions/worker/WorkerConfig.java | 3 +- .../worker/FunctionRuntimeManagerTest.java | 33 +--------- .../functions/worker/MembershipManagerTest.java | 3 +- .../functions/worker/SchedulerManagerTest.java | 8 --- 11 files changed, 67 insertions(+), 227 deletions(-) diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml index 497c414..444b7fb 100644 --- a/conf/functions_worker.yml +++ b/conf/functions_worker.yml @@ -27,7 +27,6 @@ connectorsDirectory: ./connectors functionMetadataTopicName: metadata clusterCoordinationTopicName: coordinate pulsarFunctionsNamespace: public/functions -pulsarAssignmentNamespace: public/assignment pulsarFunctionsCluster: standalone pulsarServiceUrl: pulsar://localhost:6650 pulsarWebServiceUrl: http://localhost:8080 diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java index 54c14c0..dd39222 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarSinkE2ETest.java @@ -105,7 +105,6 @@ public class PulsarSinkE2ETest { WorkerService functionsWorkerService; final String tenant = "external-repl-prop"; String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin"; - String pulsarAssignmentNamespace = tenant + "/use/pulsar-assignment"; String primaryHost; String workerId; @@ -213,7 +212,6 @@ public class PulsarSinkE2ETest { private WorkerService createPulsarFunctionWorker(ServiceConfiguration config) { workerConfig = new WorkerConfig(); workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace); - workerConfig.setPulsarAssignmentNamespace(pulsarAssignmentNamespace); workerConfig.setSchedulerClassName( org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName()); workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use")); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java index 060120b..366eaba 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionAssignmentTailer.java @@ -20,15 +20,11 @@ package org.apache.pulsar.functions.worker; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.functions.proto.Request; -import com.google.common.collect.Lists; - import java.io.IOException; -import java.util.List; import java.util.function.Function; @Slf4j @@ -37,22 +33,13 @@ public class FunctionAssignmentTailer private final FunctionRuntimeManager functionRuntimeManager; private final Reader<byte[]> reader; - - private long currentVersion = 0; - private final List<Request.AssignmentsUpdate> currentVersionAssignments; - private volatile MessageId previousOldAssignmentMsgId = null; public FunctionAssignmentTailer(FunctionRuntimeManager functionRuntimeManager, Reader<byte[]> reader) throws PulsarClientException { - this.functionRuntimeManager = functionRuntimeManager; - this.reader = reader; - this.currentVersionAssignments = Lists.newArrayList(); - // complete init if reader has no message to read so, scheduled-manager can schedule assignments - if (!hasMessageAvailable()) { - this.functionRuntimeManager.initialized = true; + this.functionRuntimeManager = functionRuntimeManager; + this.reader = reader; } - } public void start() { @@ -79,40 +66,29 @@ public class FunctionAssignmentTailer @Override public void accept(Message<byte[]> msg) { - Request.AssignmentsUpdate assignmentsUpdate; + // check if latest + boolean hasMessageAvailable; try { - assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(msg.getData()); - } catch (IOException e) { - log.error("[{}] Received bad assignment update at message {}", reader.getTopic(), msg.getMessageId(), - e); - // TODO: find a better way to handle bad request + hasMessageAvailable = this.reader.hasMessageAvailable(); + } catch (PulsarClientException e) { throw new RuntimeException(e); } - if (log.isDebugEnabled()) { - log.debug("Received assignment update: {}", assignmentsUpdate); - } - - // clear previous version assignments and ack all previous messages - if (currentVersion < assignmentsUpdate.getVersion()) { - currentVersionAssignments.clear(); - // ack the outdated version to avoid processing again - if (previousOldAssignmentMsgId != null && this.functionRuntimeManager.isActiveRuntimeConsumer.get()) { - this.functionRuntimeManager.assignmentConsumer.acknowledgeCumulativeAsync(previousOldAssignmentMsgId); + if (!hasMessageAvailable) { + Request.AssignmentsUpdate assignmentsUpdate; + try { + assignmentsUpdate = Request.AssignmentsUpdate.parseFrom(msg.getData()); + } catch (IOException e) { + log.error("[{}] Received bad assignment update at message {}", reader.getTopic(), msg.getMessageId(), + e); + // TODO: find a better way to handle bad request + throw new RuntimeException(e); + } + if (log.isDebugEnabled()) { + log.debug("Received assignment update: {}", assignmentsUpdate); } - } - currentVersionAssignments.add(assignmentsUpdate); - - // process only if the latest message - if (!hasMessageAvailable()) { - this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(), currentVersionAssignments); - // function-runtime manager has processed all assignments in the topic at least once.. so scheduled-manager - // can only publish any new assignment with latest processed version - this.functionRuntimeManager.initialized = true; + this.functionRuntimeManager.processAssignmentUpdate(msg.getMessageId(), assignmentsUpdate); } - - currentVersion = assignmentsUpdate.getVersion(); - previousOldAssignmentMsgId = msg.getMessageId(); // receive next request receiveOne(); } @@ -123,12 +99,4 @@ public class FunctionAssignmentTailer // TODO: find a better way to handle consumer functions throw new RuntimeException(cause); } - - private boolean hasMessageAvailable() { - try { - return this.reader.hasMessageAvailable(); - } catch (PulsarClientException e) { - throw new RuntimeException(e); - } - } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index 452b51c..93828de 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -26,16 +26,10 @@ import java.net.URISyntaxException; import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Reader; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.MessageIdImpl; import org.apache.pulsar.common.policies.data.ErrorData; -import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; -import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats; import org.apache.pulsar.functions.proto.Function.Assignment; import org.apache.pulsar.functions.instance.AuthenticationConfig; import org.apache.pulsar.functions.proto.InstanceCommunication; @@ -44,7 +38,6 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory; import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory; import org.apache.pulsar.functions.runtime.Runtime; import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory; -import org.inferred.freebuilder.shaded.org.apache.commons.lang3.StringUtils; import org.apache.pulsar.functions.runtime.RuntimeSpawner; import javax.ws.rs.WebApplicationException; @@ -65,7 +58,6 @@ import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** @@ -102,11 +94,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ private final ConnectorsManager connectorsManager; private final PulsarAdmin functionAdmin; - - private static final String ASSIGNMENT_TOPIC_SUBSCRIPTION = "pulsar.functions"; - protected AtomicBoolean isActiveRuntimeConsumer = new AtomicBoolean(false); - protected Consumer<byte[]> assignmentConsumer; - protected volatile boolean initialized = false; public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerService, Namespace dlogNamespace, MembershipManager membershipManager, ConnectorsManager connectorsManager) throws Exception { @@ -116,7 +103,7 @@ public class FunctionRuntimeManager implements AutoCloseable{ Reader<byte[]> reader = workerService.getClient().newReader() .topic(this.workerConfig.getFunctionAssignmentTopic()) - .startMessageId(getLatestAssignmentMsgId(workerConfig, workerService)) + .startMessageId(MessageId.earliest) .create(); this.functionAssignmentTailer = new FunctionAssignmentTailer(this, reader); @@ -145,20 +132,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ } else { throw new RuntimeException("Either Thread or Process Container Factory need to be set"); } - - this.assignmentConsumer = workerService.getClient().newConsumer() - .topic(this.workerConfig.getFunctionAssignmentTopic()).subscriptionName(ASSIGNMENT_TOPIC_SUBSCRIPTION) - .subscriptionType(SubscriptionType.Failover).consumerEventListener(new ConsumerEventListener() { - private static final long serialVersionUID = 1L; - - public void becameActive(Consumer<?> consumer, int partitionId) { - isActiveRuntimeConsumer.set(true); - } - - public void becameInactive(Consumer<?> consumer, int partitionId) { - isActiveRuntimeConsumer.set(false); - } - }).subscribe(); this.actionQueue = new LinkedBlockingQueue<>(); @@ -168,29 +141,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ this.membershipManager = membershipManager; } - private MessageId getLatestAssignmentMsgId(WorkerConfig workerConfig, WorkerService workerService) { - try { - PulsarAdmin admin = workerService.getBrokerAdmin(); - PersistentTopicInternalStats topicStats = admin.topics() - .getInternalStats(workerConfig.getFunctionAssignmentTopic()); - if (topicStats != null && topicStats.cursors != null) { - CursorStats cursor = topicStats.cursors.get(ASSIGNMENT_TOPIC_SUBSCRIPTION); - if (cursor != null && StringUtils.isNotBlank(cursor.markDeletePosition)) { - String[] ids = cursor.markDeletePosition.split(":"); - if (ids.length == 2) { - MessageIdImpl msgId = new MessageIdImpl(Long.parseLong(ids[0]), Long.parseLong(ids[1]), -1); - log.info("Assignment-reader starts reading from {}", msgId); - return msgId; - } - } - } - } catch (Exception e) { - log.warn("Failed to get assignment-msg id for runtime-manager for {}-{}", - workerConfig.getFunctionAssignmentTopic(), ASSIGNMENT_TOPIC_SUBSCRIPTION, e); - } - return MessageId.earliest; - } - /** * Starts the function runtime manager */ @@ -500,12 +450,6 @@ public class FunctionRuntimeManager implements AutoCloseable{ return functionStatusListBuilder.build(); } - public synchronized void processAssignmentUpdate(MessageId messageId, List<AssignmentsUpdate> assignmentsUpdates) { - assignmentsUpdates.forEach(assignmentsUpdate -> { - processAssignmentUpdate(messageId, assignmentsUpdate); - }); - } - /** * Process an assignment update from the assignment topic * @param messageId the message id of the update assignment @@ -604,10 +548,9 @@ public class FunctionRuntimeManager implements AutoCloseable{ // set as current assignment this.currentAssignmentVersion = assignmentsUpdate.getVersion(); + } else { - if (log.isDebugEnabled()) { - log.debug("Received out of date assignment update: {}", assignmentsUpdate); - } + log.debug("Received out of date assignment update: {}", assignmentsUpdate); } } @@ -746,8 +689,4 @@ public class FunctionRuntimeManager implements AutoCloseable{ private FunctionRuntimeInfo getFunctionRuntimeInfo(String fullyQualifiedInstanceId) { return this.functionRuntimeInfoMap.get(fullyQualifiedInstanceId); } - - public boolean isInitialized() { - return initialized; - } } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java index 509c925..b18fd12 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/MembershipManager.java @@ -21,7 +21,6 @@ package org.apache.pulsar.functions.worker; import com.google.common.annotations.VisibleForTesting; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -44,7 +43,6 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.ConsumerImpl; import org.apache.pulsar.common.policies.data.ConsumerStats; -import org.apache.pulsar.common.policies.data.SubscriptionStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java index 40ab132..ed00958 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java @@ -47,13 +47,9 @@ import org.apache.pulsar.functions.proto.Request; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.worker.scheduler.IScheduler; -import com.google.common.collect.Iterables; - @Slf4j public class SchedulerManager implements AutoCloseable { - private static final int MAX_ASSIGNMENTS_IN_MSG = 2000; - private final WorkerConfig workerConfig; @Setter @@ -103,14 +99,6 @@ public class SchedulerManager implements AutoCloseable { } private void invokeScheduler() { - - // publish new assignment only once function-runtime manager updates previously updated version so, - // scheduled-manager can publish new assignment with updated version at FunctionRuntimeManager - if (!functionRuntimeManager.isInitialized()) { - schedule(); - return; - } - List<String> currentMembership = this.membershipManager.getCurrentMembership() .stream().map(workerInfo -> workerInfo.getWorkerId()).collect(Collectors.toList()); @@ -155,12 +143,21 @@ public class SchedulerManager implements AutoCloseable { List<Assignment> assignments = this.scheduler.schedule( needsAssignment, currentAssignments, currentMembership); - if (log.isDebugEnabled()) { - log.debug("New assignments computed: {}", assignments); - } + log.debug("New assignments computed: {}", assignments); long assignmentVersion = this.functionRuntimeManager.getCurrentAssignmentVersion() + 1; - publishAssignmentUpdate(assignmentVersion, assignments); + Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder() + .setVersion(assignmentVersion) + .addAllAssignments(assignments) + .build(); + + CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(assignmentsUpdate.toByteArray()); + try { + messageIdCompletableFuture.get(); + } catch (InterruptedException | ExecutionException e) { + log.error("Failed to send assignment update", e); + throw new RuntimeException(e); + } // wait for assignment update to go throw the pipeline int retries = 0; @@ -179,23 +176,6 @@ public class SchedulerManager implements AutoCloseable { } } - private void publishAssignmentUpdate(long assignmentVersion, List<Assignment> assignments) { - - Iterable<List<Assignment>> batches = Iterables.partition(assignments, MAX_ASSIGNMENTS_IN_MSG); - batches.forEach(assignmentBatch -> { - Request.AssignmentsUpdate assignmentsUpdate = Request.AssignmentsUpdate.newBuilder() - .setVersion(assignmentVersion).addAllAssignments(assignmentBatch).build(); - CompletableFuture<MessageId> messageIdCompletableFuture = producer - .sendAsync(assignmentsUpdate.toByteArray()); - try { - messageIdCompletableFuture.get(); - } catch (InterruptedException | ExecutionException e) { - log.error("Failed to send assignment update", e); - throw new RuntimeException(e); - } - }); - } - public static Map<String, Function.Instance> computeAllInstances(List<FunctionMetaData> allFunctions) { Map<String, Function.Instance> functionInstances = new HashMap<>(); for (FunctionMetaData functionMetaData : allFunctions) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index 4d6ba5f..cb73eaa 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -97,10 +97,32 @@ public class Worker extends AbstractService { // getting namespace policy log.info("Initializing Pulsar Functions namespace..."); try { - createNamespace(admin, workerConfig.getPulsarFunctionsCluster(), workerConfig.getPulsarFunctionsNamespace(), - true); - createNamespace(admin, workerConfig.getPulsarFunctionsCluster(), - workerConfig.getPulsarAssignmentNamespace(), false); + try { + admin.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace()); + } catch (PulsarAdminException e) { + if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) { + // if not found than create + try { + Policies policies = new Policies(); + policies.retention_policies = new RetentionPolicies(-1, -1); + policies.replication_clusters = new HashSet<>(); + policies.replication_clusters.add(workerConfig.getPulsarFunctionsCluster()); + admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace(), + policies); + } catch (PulsarAdminException e1) { + // prevent race condition with other workers starting up + if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) { + log.error("Failed to create namespace {} for pulsar functions", workerConfig + .getPulsarFunctionsNamespace(), e1); + throw e1; + } + } + } else { + log.error("Failed to get retention policy for pulsar function namespace {}", + workerConfig.getPulsarFunctionsNamespace(), e); + throw e; + } + } try { internalConf = admin.brokers().getInternalConfigurationData(); } catch (PulsarAdminException e) { @@ -124,35 +146,6 @@ public class Worker extends AbstractService { } } - private static void createNamespace(PulsarAdmin admin, String cluster, String namespace, boolean infiniteRetention) - throws PulsarAdminException { - try { - admin.namespaces().getPolicies(namespace); - } catch (PulsarAdminException e) { - if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) { - // if not found than create - try { - Policies policies = new Policies(); - if (infiniteRetention) { - policies.retention_policies = new RetentionPolicies(-1, -1); - } - policies.replication_clusters = new HashSet<>(); - policies.replication_clusters.add(cluster); - admin.namespaces().createNamespace(namespace, policies); - } catch (PulsarAdminException e1) { - // prevent race condition with other workers starting up - if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) { - log.error("Failed to create namespace {} for pulsar functions", namespace, e1); - throw e1; - } - } - } else { - log.error("Failed to get retention policy for pulsar function namespace {}", namespace, e); - throw e; - } - } - } - @Override protected void doStop() { if (null != this.server) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java index 74ccd7c..0f695a9 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java @@ -61,7 +61,6 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { private String pulsarWebServiceUrl; private String clusterCoordinationTopicName; private String pulsarFunctionsNamespace; - private String pulsarAssignmentNamespace; private String pulsarFunctionsCluster; private int numFunctionPackageReplicas; private String downloadDirectory; @@ -134,7 +133,7 @@ public class WorkerConfig implements Serializable, PulsarConfiguration { } public String getFunctionAssignmentTopic() { - return String.format("persistent://%s/%s", pulsarAssignmentNamespace, functionAssignmentTopicName); + return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionAssignmentTopicName); } public static WorkerConfig load(String yamlFile) throws IOException { diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java index c19d208..85a2122 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java @@ -20,16 +20,10 @@ package org.apache.pulsar.functions.worker; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.ConsumerEventListener; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; -import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Reader; import org.apache.pulsar.client.api.ReaderBuilder; -import org.apache.pulsar.client.api.SubscriptionType; -import org.apache.pulsar.client.impl.ConsumerImpl; -import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.Request; @@ -51,7 +45,6 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; public class FunctionRuntimeManagerTest { @@ -88,7 +81,7 @@ public class FunctionRuntimeManagerTest { workerConfig.setStateStorageServiceUrl("foo"); workerConfig.setFunctionAssignmentTopicName("assignments"); - PulsarClient pulsarClient = mockPulsarClient(); + PulsarClient pulsarClient = mock(PulsarClient.class); ReaderBuilder readerBuilder = mock(ReaderBuilder.class); doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); @@ -185,7 +178,7 @@ public class FunctionRuntimeManagerTest { workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); - PulsarClient pulsarClient = mockPulsarClient(); + PulsarClient pulsarClient = mock(PulsarClient.class); ReaderBuilder readerBuilder = mock(ReaderBuilder.class); doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); @@ -286,7 +279,7 @@ public class FunctionRuntimeManagerTest { workerConfig.setPulsarServiceUrl("pulsar://localhost:6650"); workerConfig.setStateStorageServiceUrl("foo"); - PulsarClient pulsarClient = mockPulsarClient(); + PulsarClient pulsarClient = mock(PulsarClient.class); ReaderBuilder readerBuilder = mock(ReaderBuilder.class); doReturn(readerBuilder).when(pulsarClient).newReader(); doReturn(readerBuilder).when(readerBuilder).topic(anyString()); @@ -401,24 +394,4 @@ public class FunctionRuntimeManagerTest { Assert.assertEquals(functionRuntimeManager.workerIdToAssignments .get("worker-1").get("test-tenant/test-namespace/func-2:0"), assignment3); } - - private static PulsarClient mockPulsarClient() throws PulsarClientException { - PulsarClientImpl mockClient = mock(PulsarClientImpl.class); - - ConsumerImpl<byte[]> mockConsumer = mock(ConsumerImpl.class); - ConsumerBuilder<byte[]> mockConsumerBuilder = mock(ConsumerBuilder.class); - - when(mockConsumerBuilder.topic(anyString())).thenReturn(mockConsumerBuilder); - when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder); - when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder); - when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder); - - when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer); - - when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder); - - when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder); - - return mockClient; - } } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java index 26e0540..2753bf1 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java @@ -76,6 +76,7 @@ public class MembershipManagerTest { when(mockConsumerBuilder.subscriptionName(anyString())).thenReturn(mockConsumerBuilder); when(mockConsumerBuilder.subscriptionType(any(SubscriptionType.class))).thenReturn(mockConsumerBuilder); when(mockConsumerBuilder.property(anyString(), anyString())).thenReturn(mockConsumerBuilder); + when(mockConsumerBuilder.subscribe()).thenReturn(mockConsumer); WorkerService workerService = mock(WorkerService.class); doReturn(workerConfig).when(workerService).getWorkerConfig(); @@ -119,7 +120,7 @@ public class MembershipManagerTest { when(mockConsumerBuilder.consumerEventListener(any(ConsumerEventListener.class))).thenReturn(mockConsumerBuilder); when(mockClient.newConsumer()).thenReturn(mockConsumerBuilder); - + return mockClient; } diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java index 6034124..19977bd 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java @@ -111,7 +111,6 @@ public class SchedulerManagerTest { schedulerManager = spy(new SchedulerManager(workerConfig, pulsarClient)); functionRuntimeManager = mock(FunctionRuntimeManager.class); - when(functionRuntimeManager.isInitialized()).thenReturn(true); functionMetaDataManager = mock(FunctionMetaDataManager.class); membershipManager = mock(MembershipManager.class); schedulerManager.setFunctionMetaDataManager(functionMetaDataManager); @@ -146,7 +145,6 @@ public class SchedulerManagerTest { //set version doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - doReturn(true).when(functionRuntimeManager).isInitialized(); // single node List<WorkerInfo> workerInfoList = new LinkedList<>(); @@ -188,7 +186,6 @@ public class SchedulerManagerTest { assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - doReturn(true).when(functionRuntimeManager).isInitialized(); //set version doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); @@ -245,7 +242,6 @@ public class SchedulerManagerTest { assignmentEntry1.put(Utils.getFullyQualifiedInstanceId(assignment1.getInstance()), assignment1); currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - doReturn(true).when(functionRuntimeManager).isInitialized(); //set version doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); @@ -317,7 +313,6 @@ public class SchedulerManagerTest { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - doReturn(true).when(functionRuntimeManager).isInitialized(); //set version doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); @@ -377,7 +372,6 @@ public class SchedulerManagerTest { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - doReturn(true).when(functionRuntimeManager).isInitialized(); //set version doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); @@ -484,7 +478,6 @@ public class SchedulerManagerTest { currentAssignments.put("worker-1", assignmentEntry1); doReturn(currentAssignments).when(functionRuntimeManager).getCurrentAssignments(); - doReturn(true).when(functionRuntimeManager).isInitialized(); //set version doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); @@ -653,7 +646,6 @@ public class SchedulerManagerTest { //set version doReturn(version).when(functionRuntimeManager).getCurrentAssignmentVersion(); - doReturn(true).when(functionRuntimeManager).isInitialized(); // single node List<WorkerInfo> workerInfoList = new LinkedList<>();