C0urante commented on code in PR #14538: URL: https://github.com/apache/kafka/pull/14538#discussion_r1362527136
########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java: ########## @@ -72,115 +41,42 @@ import static org.apache.kafka.connect.runtime.rest.RestServerConfig.LISTENERS_CONFIG; /** - * Start an embedded connect worker. Internally, this class will spin up a Kafka and Zk cluster, setup any tmp - * directories and clean up them on them. Methods on the same {@code EmbeddedConnectCluster} are + * Start an embedded connect cluster. Internally, this class will spin up a Kafka and Zk cluster, set up any tmp + * directories, and clean them up on exit. Methods on the same {@code EmbeddedConnectCluster} are * not guaranteed to be thread-safe. */ -public class EmbeddedConnectCluster { +public class EmbeddedConnectCluster extends EmbeddedConnect { private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectCluster.class); - public static final int DEFAULT_NUM_BROKERS = 1; public static final int DEFAULT_NUM_WORKERS = 1; - private static final Properties DEFAULT_BROKER_CONFIG = new Properties(); private static final String REST_HOST_NAME = "localhost"; private static final String DEFAULT_WORKER_NAME_PREFIX = "connect-worker-"; private final Set<WorkerHandle> connectCluster; - private final EmbeddedKafkaCluster kafkaCluster; - private final HttpClient httpClient; private final Map<String, String> workerProps; private final String connectClusterName; - private final int numBrokers; private final int numInitialWorkers; - private final boolean maskExitProcedures; private final String workerNamePrefix; private final AtomicInteger nextWorkerId = new AtomicInteger(0); - private final EmbeddedConnectClusterAssertions assertions; - // we should keep the original class loader and set it back after connector stopped since the connector will change the class loader, - // and then, the Mockito will use the unexpected class loader to generate the wrong proxy instance, which makes mock failed - private final ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); - private EmbeddedConnectCluster(String name, Map<String, String> workerProps, int numWorkers, - int numBrokers, Properties brokerProps, - boolean maskExitProcedures, - Map<String, String> additionalKafkaClusterClientConfigs) { + private EmbeddedConnectCluster( + int numBrokers, + Properties brokerProps, + boolean maskExitProcedures, + Map<String, String> clientProps, + Map<String, String> workerProps, + String name, + int numWorkers + ) { + super(numBrokers, brokerProps, maskExitProcedures, clientProps); this.workerProps = workerProps; this.connectClusterName = name; - this.numBrokers = numBrokers; - this.kafkaCluster = new EmbeddedKafkaCluster(numBrokers, brokerProps, additionalKafkaClusterClientConfigs); this.connectCluster = new LinkedHashSet<>(); - this.httpClient = new HttpClient(); this.numInitialWorkers = numWorkers; - this.maskExitProcedures = maskExitProcedures; // leaving non-configurable for now this.workerNamePrefix = DEFAULT_WORKER_NAME_PREFIX; - this.assertions = new EmbeddedConnectClusterAssertions(this); - } - - /** - * A more graceful way to handle abnormal exit of services in integration tests. - */ - public Exit.Procedure exitProcedure = (code, message) -> { - if (code != 0) { - String exitMessage = "Abrupt service exit with code " + code + " and message " + message; - log.warn(exitMessage); - throw new UngracefulShutdownException(exitMessage); - } - }; - - /** - * A more graceful way to handle abnormal halt of services in integration tests. - */ - public Exit.Procedure haltProcedure = (code, message) -> { - if (code != 0) { - String haltMessage = "Abrupt service halt with code " + code + " and message " + message; - log.warn(haltMessage); - throw new UngracefulShutdownException(haltMessage); - } - }; - - /** - * Start the connect cluster and the embedded Kafka and Zookeeper cluster. - */ - public void start() { - if (maskExitProcedures) { - Exit.setExitProcedure(exitProcedure); - Exit.setHaltProcedure(haltProcedure); - } - kafkaCluster.start(); - startConnect(); - try { - httpClient.start(); - } catch (Exception e) { - throw new ConnectException("Failed to start HTTP client", e); - } - } - - /** - * Stop the connect cluster and the embedded Kafka and Zookeeper cluster. - * Clean up any temp directories created locally. - * - * @throws RuntimeException if Kafka brokers fail to stop - */ - public void stop() { - Utils.closeQuietly(httpClient::stop, "HTTP client for embedded Connect cluster"); - connectCluster.forEach(this::stopWorker); - try { - kafkaCluster.stop(); - } catch (UngracefulShutdownException e) { - log.warn("Kafka did not shutdown gracefully"); - } catch (Exception e) { - log.error("Could not stop kafka", e); - throw new RuntimeException("Could not stop brokers", e); - } finally { - if (maskExitProcedures) { - Exit.resetExitProcedure(); - Exit.resetHaltProcedure(); - } - Plugins.compareAndSwapLoaders(originalClassLoader); - } Review Comment: This is all migrated directly (i.e., copy+pasted) to the `EmbeddedConnect` class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org