This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository 
https://gitbox.apache.org/repos/asf/pekko-persistence-cassandra.git


The following commit(s) were added to refs/heads/main by this push:
     new d89f0d5  update tests to use testcontainers (#404)
d89f0d5 is described below

commit d89f0d51dda68f4a619269e2574265fc5c47b645
Author: PJ Fanning <[email protected]>
AuthorDate: Mon May 25 09:47:58 2026 +0100

    update tests to use testcontainers (#404)
    
    * Replace CassandraLauncher with testcontainers in ReconnectSpec and 
EventsByTagMultiJvmSpec
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-cassandra/sessions/4e9c6664-673c-4179-9825-f6e7a804d9fe
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Address code review: add @volatile, add comment on host parameter
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-persistence-cassandra/sessions/4e9c6664-673c-4179-9825-f6e7a804d9fe
    
    Co-authored-by: pjfanning <[email protected]>
    
    * refactor
    
    * remove cassandra-launcher
    
    * Delete CassandraLauncherSpec.scala
    
    * avoid deprecated classes
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 build.sbt                                          |  38 +-
 .../src/main/resources/log4j.properties            |   4 -
 cassandra-bundle/src/main/resources/logback.xml    |  17 -
 cassandra-launcher/.gitignore                      |   1 -
 .../cassandra/testkit/CassandraLauncher.scala      | 428 ---------------------
 .../cassandra/EventsByTagMultiJvmSpec.scala        |  43 ++-
 .../persistence/cassandra/ReconnectSpec.scala      |  14 +-
 .../cassandra/query/EventsByTagSpec.scala          |   8 +-
 .../cassandra/testkit/CassandraLauncherSpec.scala  |  74 ----
 project/Dependencies.scala                         |   5 +-
 10 files changed, 40 insertions(+), 592 deletions(-)

diff --git a/build.sbt b/build.sbt
index 76fe5c1..bfeb188 100644
--- a/build.sbt
+++ b/build.sbt
@@ -23,7 +23,7 @@ lazy val root = project
   .in(file("."))
   .enablePlugins(Common, ScalaUnidocPlugin)
   .disablePlugins(SitePlugin, MimaPlugin)
-  .aggregate(core, cassandraLauncher)
+  .aggregate(core)
   .settings(name := "pekko-persistence-cassandra-root", publish / skip := true)
 
 lazy val dumpSchema = taskKey[Unit]("Dumps cassandra schema for docs")
@@ -32,7 +32,6 @@ dumpSchema := (core / Test / runMain).toTask(" 
org.apache.pekko.persistence.cass
 lazy val core = project
   .in(file("core"))
   .enablePlugins(Common, AutomateHeaderPlugin, MimaPlugin, MultiJvmPlugin, 
ReproducibleBuildsPlugin)
-  .dependsOn(cassandraLauncher % Test)
   .addPekkoModuleDependency("pekko-connectors-cassandra", "", 
PekkoConnectorsDependency.default)
   .addPekkoModuleDependency("pekko-persistence", "", 
PekkoCoreDependency.default)
   .addPekkoModuleDependency("pekko-persistence-query", "", 
PekkoCoreDependency.default)
@@ -55,41 +54,6 @@ lazy val core = project
       organization.value %% name.value % mimaCompareVersion))
   .configs(MultiJvm)
 
-lazy val cassandraLauncher = project
-  .in(file("cassandra-launcher"))
-  .enablePlugins(Common, ReproducibleBuildsPlugin)
-  .disablePlugins(MimaPlugin)
-  .settings(
-    name := "pekko-persistence-cassandra-launcher",
-    Compile / unmanagedResources += (cassandraBundle / Compile / 
packageBin).value)
-
-// This project doesn't get published directly, rather the assembled artifact 
is included as part of cassandraLaunchers
-// resources
-lazy val cassandraBundle = project
-  .in(file("cassandra-bundle"))
-  .enablePlugins(Common, AutomateHeaderPlugin)
-  .disablePlugins(MimaPlugin)
-  .settings(
-    name := "pekko-persistence-cassandra-bundle",
-    crossPaths := false,
-    autoScalaLibrary := false,
-    libraryDependencies += ("org.apache.cassandra" % "cassandra-all" % 
"3.11.3")
-      .exclude("commons-logging", "commons-logging"),
-    dependencyOverrides += "com.github.jbellis" % "jamm" % "0.3.3", // See 
jamm comment in https://issues.apache.org/jira/browse/CASSANDRA-9608
-    assembly / assemblyJarName := "cassandra-bundle.jar",
-    Compile / packageBin := Def.taskDyn {
-      val store = streams.value.cacheStoreFactory.make("shaded-output")
-      val uberJarLocation = (assembly / assemblyOutputPath).value
-      val tracker = Tracked.outputChanged(store) { (changed: Boolean, file: 
File) =>
-        if (changed) {
-          Def.task {
-            (Compile / assembly).value
-          }
-        } else Def.task { file }
-      }
-      tracker(() => uberJarLocation)
-    }.value)
-
 // Used for testing events by tag in various environments
 lazy val endToEndExample = project
   .in(file("example"))
diff --git a/cassandra-bundle/src/main/resources/log4j.properties 
b/cassandra-bundle/src/main/resources/log4j.properties
deleted file mode 100644
index 1db758a..0000000
--- a/cassandra-bundle/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,4 +0,0 @@
-log4j.rootLogger=ERROR, stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.SimpleLayout
-log4j.logger.org.apache.cassandra.service.CassandraDaemon=OFF
diff --git a/cassandra-bundle/src/main/resources/logback.xml 
b/cassandra-bundle/src/main/resources/logback.xml
deleted file mode 100644
index 70c10dc..0000000
--- a/cassandra-bundle/src/main/resources/logback.xml
+++ /dev/null
@@ -1,17 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<configuration>
-
-    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
-        <target>System.out</target>
-        <encoder>
-            <pattern>%date{MM/dd HH:mm:ss} %-5level[%thread] %logger{1} - 
%m%n%xException</pattern>
-        </encoder>
-    </appender>
-
-    <logger name="org.apache.cassandra" level="ERROR" />
-
-    <root level="INFO">
-        <appender-ref ref="CONSOLE"/>
-    </root>
-
-</configuration>
\ No newline at end of file
diff --git a/cassandra-launcher/.gitignore b/cassandra-launcher/.gitignore
deleted file mode 100644
index 4fae3a9..0000000
--- a/cassandra-launcher/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-/.target/
diff --git 
a/cassandra-launcher/src/main/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncher.scala
 
b/cassandra-launcher/src/main/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncher.scala
deleted file mode 100644
index bc61dd6..0000000
--- 
a/cassandra-launcher/src/main/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncher.scala
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * license agreements; and to You under the Apache License, version 2.0:
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * This file is part of the Apache Pekko project, which was derived from Akka.
- */
-
-/*
- * Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
- */
-
-package org.apache.pekko.persistence.cassandra.testkit
-
-import java.io._
-import java.net.{ InetSocketAddress, Socket, URI }
-import java.nio.channels.ServerSocketChannel
-import java.nio.file.Files
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicReference
-import scala.annotation.varargs
-import scala.collection.immutable
-import scala.concurrent.duration._
-import scala.util.control.NonFatal
-import scala.util.{ Failure, Try }
-
-/**
- * Starts Cassandra in current JVM. There can only be one Cassandra instance 
per JVM,
- * but keyspaces can be used for isolation.
- */
-object CassandraLauncher {
-
-  class CleanFailedException(message: String, cause: Throwable) extends 
RuntimeException(message, cause)
-
-  private val ForcedShutdownTimeout = 20.seconds
-  // Used in fork mode to wait for Cassandra to start listening
-  private val AwaitListenTimeout = 45.seconds
-  private val AwaitListenPoll = 100.millis
-
-  /**
-   * Default config for testing "test-embedded-cassandra.yaml"
-   */
-  val DefaultTestConfigResource: String = "test-embedded-cassandra.yaml"
-
-  /**
-   * Main method to start Cassandra, see [[#start]].
-   * Note that `cassandra-all` jar must be in classpath.
-   *
-   * `port can be defined with `-DCassandraLauncher.port=4000`,
-   *   default is the `randomPort`
-   * `clean` can be defined with `-DCassandraLauncher.clean=true`,
-   *   default is `false`
-   * `directory` can be defined with 
`-DCassandraLauncher.directory=target/embedded-cassandra`,
-   *   default is `target/embedded-cassandra`
-   * `configResource` yaml configuration loaded from classpath,
-   *   can be defined with 
`-DCassandraLauncher.configResource=test-embedded-cassandra.yaml`,
-   *   default is defined in [[CassandraLauncher#DefaultTestConfigResource]],
-   *   i.e. `test-embedded-cassandra.yaml`
-   */
-  def main(args: Array[String]): Unit = {
-    val port: Int =
-      if (args.length > 0) args(0).toInt
-      else Integer.getInteger("CassandraLauncher.port", 0)
-    val clean =
-      if (args.length > 1) args(1).toBoolean
-      else java.lang.Boolean.getBoolean("CassandraLauncher.clean")
-    val dir =
-      if (args.length > 2) new File(args(2))
-      else
-        new File(System.getProperty("CassandraLauncher.directory", 
"target/embedded-cassandra"))
-    val configResource =
-      if (args.length > 3) args(3)
-      else
-        System.getProperty("CassandraLauncher.configResource", 
DefaultTestConfigResource)
-    start(dir, configResource, clean, port)
-  }
-
-  private var cassandraDaemon: Option[Closeable] = None
-
-  private val DEFAULT_HOST = "127.0.0.1"
-
-  private val initialPortsValue = (0, 0)
-  private val selectedPorts: AtomicReference[(Int, Int)] = new 
AtomicReference(initialPortsValue)
-
-  /**
-   * The random free port that will be used if `port=0` is
-   * specified in the `start` method.
-   *
-   * Calling `randomPort` before `start` is not recommended. It will fix the 
value and won't necessarily
-   * reflect the value that is effectively used by the launcher.
-   */
-  lazy val randomPort: Int = {
-    selectedPorts.compareAndSet(initialPortsValue, 
selectFreePorts(DEFAULT_HOST, 0))
-    selectedPorts.get()._1
-  }
-
-  /**
-   * Select two free ports.
-   * Note that requestPort is always used even if user requested a fixed port. 
We want to make sure the port is not in use
-   * and won't conflict with the storagePort
-   */
-  private def selectFreePorts(host: String, requestedPort: Int): (Int, Int) = {
-
-    val clientSocket = ServerSocketChannel.open().socket()
-    val storageSocket = ServerSocketChannel.open().socket()
-
-    try {
-      clientSocket.bind(new InetSocketAddress(host, requestedPort))
-      storageSocket.bind(new InetSocketAddress(host, 0))
-
-      // return both ports
-      (clientSocket.getLocalPort, storageSocket.getLocalPort)
-
-    } finally {
-      // close independently
-      val t1 = Try(clientSocket.close())
-      val t2 = Try(storageSocket.close())
-
-      // if one the two failed, we should throw the exception
-      (t1, t2) match {
-        case (Failure(ex1), Failure(ex2)) =>
-          throw new RuntimeException(
-            s"Failed to close sockets: client '${ex1.getMessage}', storage 
'${ex2.getMessage}'")
-        case (Failure(ex1), _) => throw new RuntimeException(s"Failed to close 
client-port socket: '${ex1.getMessage}'")
-        case (_, Failure(ex2)) =>
-          throw new RuntimeException(s"Failed to close storage-port socket: 
'${ex2.getMessage}'")
-        case (_, _) => // we are fine, all closed
-      }
-
-    }
-
-  }
-
-  /**
-   * Use this to locate classpath elements from the current classpath to add
-   * to the classpath of the launched Cassandra.
-   *
-   * This is particularly useful if you want a custom logging, you can use
-   * this to ensure that the directory that your log file is in is on the
-   * classpath of the forked Cassandra process, for example:
-   *
-   * ```
-   * CassandraLauncher.start(
-   *   cassandraDirectory,
-   *   CassandraLauncher.DefaultTestConfigResource,
-   *   clean = true,
-   *   port = 0,
-   *   CassandraLauncher.classpathForResources("logback.xml")
-   * )
-   * ```
-   *
-   * Files ending with `assembly.jar` are not included in the result because an
-   * assembly jar will likely contain incompatible classes that shouldn't be 
on the
-   * classpath of the Cassandra server, such as incompatible dependency of 
Guava.
-   * Assembly jars are used when running multi-node testing.
-   */
-  @varargs
-  def classpathForResources(resources: String*): immutable.Seq[String] = {
-    resources
-      .map { resource =>
-        this.getClass.getClassLoader.getResource(resource) match {
-          case null =>
-            sys.error("Resource not found: " + resource)
-          case fileUrl if fileUrl.getProtocol == "file" =>
-            new 
File(URI.create(fileUrl.toString.stripSuffix(resource))).getCanonicalPath
-          case jarUrl if jarUrl.getProtocol == "jar" =>
-            new File(URI.create(jarUrl.getPath.takeWhile(_ != 
'!'))).getCanonicalPath
-          case _ =>
-            sys.error("Resource not supported: " + resource)
-        }
-      }
-      .distinct
-      .toList
-      .filterNot(_.endsWith("assembly.jar")) // TODO required?
-  }
-
-  /**
-   * Start Cassandra
-   *
-   * @param cassandraDirectory the data directory to use
-   * @param configResource yaml configuration loaded from classpath,
-   *   default configuration for testing is defined in 
[[CassandraLauncher#DefaultTestConfigResource]]
-   * @param clean if `true` all files in the data directory will be deleted
-   *   before starting Cassandra
-   * @param port the `native_transport_port` to use, if 0 a random
-   *   free port is used, which can be retrieved (before starting)
-   *   with [[CassandraLauncher.randomPort]].
-   * @throws 
org.apache.pekko.persistence.cassandra.testkit.CassandraLauncher.CleanFailedException
 if `clean`
-   *   is `true` and removal of the directory fails
-   */
-  def start(cassandraDirectory: File, configResource: String, clean: Boolean, 
port: Int): Unit =
-    start(cassandraDirectory, configResource, clean, port, Nil)
-
-  /**
-   * Start Cassandra
-   *
-   * @param cassandraDirectory the data directory to use
-   * @param configResource yaml configuration loaded from classpath,
-   *   default configuration for testing is defined in 
[[CassandraLauncher#DefaultTestConfigResource]]
-   * @param clean if `true` all files in the data directory will be deleted
-   *   before starting Cassandra
-   * @param port the `native_transport_port` to use, if 0 a random
-   *   free port is used, which can be retrieved (before starting)
-   *   with [[CassandraLauncher.randomPort]].
-   * @param classpath Any additional jars/directories to add to the classpath. 
Use
-   *                  [[CassandraLauncher#classpathForResources]] to assist in 
calculating this.
-   * @throws 
org.apache.pekko.persistence.cassandra.testkit.CassandraLauncher.CleanFailedException
 if `clean`
-   *   is `true` and removal of the directory fails
-   */
-  def start(
-      cassandraDirectory: File,
-      configResource: String,
-      clean: Boolean,
-      port: Int,
-      classpath: immutable.Seq[String]): Unit =
-    start(cassandraDirectory, configResource, clean, port, classpath, None)
-
-  /**
-   * Start Cassandra
-   *
-   * @param cassandraDirectory the data directory to use
-   * @param configResource yaml configuration loaded from classpath,
-   *   default configuration for testing is defined in 
[[CassandraLauncher#DefaultTestConfigResource]]
-   * @param clean if `true` all files in the data directory will be deleted
-   *   before starting Cassandra
-   * @param port the `native_transport_port` to use, if 0 a random
-   *   free port is used, which can be retrieved (before starting)
-   *   with [[CassandraLauncher.randomPort]].
-   * @param classpath Any additional jars/directories to add to the classpath. 
Use
-   *                  [[CassandraLauncher#classpathForResources]] to assist in 
calculating this.
-   * @param host the host to bind the embeded Cassandra to. If None, then 
127.0.0.1 is used.
-   * @throws 
org.apache.pekko.persistence.cassandra.testkit.CassandraLauncher.CleanFailedException
 if `clean`
-   *   is `true` and removal of the directory fails
-   */
-  def start(
-      cassandraDirectory: File,
-      configResource: String,
-      clean: Boolean,
-      port: Int,
-      classpath: immutable.Seq[String],
-      host: Option[String]): Unit = this.synchronized {
-    if (cassandraDaemon.isEmpty) {
-
-      prepareCassandraDirectory(cassandraDirectory, clean)
-
-      val realHost = host.getOrElse(DEFAULT_HOST)
-
-      // NOTE: read comments bellow to get the full picture
-      if (port != 0) {
-        // if user explicitly passes a port, we should use it and override 
`selectedPorts`
-        // selectedPorts may have been already set if user has previously 
called `randomPort`
-        // in such a case, randomPort will be fixed to a 'wrong' old number
-        selectedPorts.set(selectFreePorts(realHost, port))
-      } else {
-        // if a random port is requested, we only override `selectedPorts` if 
not yet calculated (eg: default (0,0)).
-        // If user has previously called `randomPort`, we will already have a 
value and we should keep using it.
-        selectedPorts.compareAndSet(initialPortsValue, 
selectFreePorts(realHost, port))
-      }
-
-      val (realPort, storagePort) = selectedPorts.get()
-
-      println(
-        s"Starting Cassandra on port client port: $realPort storage port 
$storagePort host $realHost java version ${System
-            .getProperty("java.runtime.version")}")
-
-      // http://wiki.apache.org/cassandra/StorageConfiguration
-      val conf = readResource(configResource)
-      val amendedConf = conf
-        .replace("$PORT", realPort.toString)
-        .replace("$STORAGE_PORT", storagePort.toString)
-        .replace("$DIR", cassandraDirectory.getAbsolutePath)
-        .replace("$HOST", realHost)
-      val configFile = new File(cassandraDirectory, configResource)
-      writeToFile(configFile, amendedConf)
-
-      // Extract the cassandra bundle to the directory
-      val cassandraBundleFile =
-        new File(cassandraDirectory, "cassandra-bundle.jar")
-      if (!cassandraBundleFile.exists()) {
-        val is =
-          
this.getClass.getClassLoader.getResourceAsStream("cassandra-bundle.jar")
-        try {
-          Files.copy(is, cassandraBundleFile.toPath)
-        } finally {
-          if (is != null) is.close()
-        }
-      }
-
-      startForked(configFile, cassandraBundleFile, classpath, realHost, 
realPort)
-    }
-  }
-
-  private def prepareCassandraDirectory(cassandraDirectory: File, clean: 
Boolean): Unit = {
-    if (clean) {
-      try {
-        deleteRecursive(cassandraDirectory)
-      } catch {
-        // deleteRecursive may throw AssertionError
-        case e: AssertionError =>
-          throw new CleanFailedException(e.getMessage, e)
-        case NonFatal(e) => throw new CleanFailedException(e.getMessage, e)
-      }
-    }
-
-    if (!cassandraDirectory.exists)
-      require(cassandraDirectory.mkdirs(), s"Couldn't create Cassandra 
directory [$cassandraDirectory]")
-  }
-
-  private def startForked(
-      configFile: File,
-      cassandraBundle: File,
-      classpath: immutable.Seq[String],
-      host: String,
-      port: Int): Unit = {
-    // Calculate classpath
-    val / = File.separator
-    val javaBin = s"${System.getProperty("java.home")}${/}bin${/}java"
-    val className = "org.apache.cassandra.service.CassandraDaemon"
-    val classpathArgument = (classpath :+ 
cassandraBundle.getAbsolutePath).mkString(File.pathSeparator)
-
-    val builder = new ProcessBuilder(
-      javaBin,
-      "-cp",
-      classpathArgument,
-      "-Dcassandra.config=file:" + configFile.getAbsoluteFile,
-      "-Dcassandra-foreground=true",
-      className).inheritIO()
-
-    val process = builder.start()
-
-    val shutdownHook = new Thread {
-      override def run(): Unit = {
-        process.destroyForcibly()
-      }
-    }
-    Runtime.getRuntime.addShutdownHook(shutdownHook)
-
-    // We wait for Cassandra to start listening before we return, since 
running in non fork mode will also not
-    // return until Cassandra has started listening.
-    waitForCassandraToListen(host, port)
-    cassandraDaemon = Some(new Closeable {
-      override def close(): Unit = {
-        process.destroy()
-        try {
-          Runtime.getRuntime.removeShutdownHook(shutdownHook)
-        } catch {
-          case _: IllegalStateException =>
-          // JVM is already shutting down
-        }
-
-        if (process.waitFor(ForcedShutdownTimeout.toMillis, 
TimeUnit.MILLISECONDS)) {
-          val exitStatus = process.exitValue()
-          // Java processes killed with SIGTERM may exit with a status of 143
-          if (exitStatus != 0 && exitStatus != 143) {
-            sys.error(s"Cassandra exited with non zero status: 
${process.exitValue()}")
-          }
-        } else {
-          process.destroyForcibly()
-          sys.error(s"Cassandra process did not stop within 
$ForcedShutdownTimeout, killing.")
-        }
-      }
-    })
-  }
-
-  /**
-   * Stops Cassandra. However, it will not be possible to start Cassandra
-   * again in same JVM.
-   */
-  def stop(): Unit = this.synchronized {
-    cassandraDaemon.foreach(_.close())
-    cassandraDaemon = None
-  }
-
-  private def readResource(resource: String): String = {
-    val sb = new StringBuilder
-    val is = getClass.getResourceAsStream("/" + resource)
-    require(is != null, s"resource [$resource] doesn't exist")
-    val reader = new BufferedReader(new InputStreamReader(is))
-    try {
-      var line = reader.readLine()
-      while (line != null) {
-        sb.append(line).append('\n')
-        line = reader.readLine()
-      }
-    } finally {
-      reader.close()
-    }
-    sb.toString
-  }
-
-  private def writeToFile(file: File, content: String): Unit = {
-    val writer = new BufferedWriter(new OutputStreamWriter(new 
FileOutputStream(file), "utf-8"))
-    try {
-      writer.write(content)
-    } finally {
-      writer.close()
-    }
-  }
-
-  private def waitForCassandraToListen(host: String, port: Int) = {
-    val deadline = AwaitListenTimeout.fromNow
-    @annotation.tailrec
-    def tryConnect(): Unit = {
-      val retry =
-        try {
-          new Socket(host, port).close()
-          false
-        } catch {
-          case _: IOException if deadline.hasTimeLeft() =>
-            Thread.sleep(AwaitListenPoll.toMillis)
-            true
-          case ioe: IOException =>
-            throw new RuntimeException(s"Cassandra did not start within 
$AwaitListenTimeout", ioe)
-        }
-      if (retry) tryConnect()
-    }
-    tryConnect()
-  }
-
-  private def deleteRecursive(file: File): Unit = {
-    if (file.isDirectory) {
-      file.listFiles().foreach(deleteRecursive)
-    }
-    file.delete()
-  }
-
-}
diff --git 
a/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
 
b/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
index f29c923..b522ea5 100644
--- 
a/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
+++ 
b/core/src/multi-jvm/scala/org/apache/pekko/cluster/persistence/cassandra/EventsByTagMultiJvmSpec.scala
@@ -11,8 +11,9 @@ package org.apache.pekko.cluster.persistence.cassandra
 
 import com.typesafe.config.ConfigFactory
 import org.apache.pekko
+import pekko.persistence.cassandra.CassandraLifecycle
+import pekko.persistence.cassandra.query.TestActor
 import pekko.persistence.cassandra.query.scaladsl.CassandraReadJournal
-import pekko.persistence.cassandra.testkit.CassandraLauncher
 import pekko.persistence.journal.Tagged
 import pekko.persistence.query.{ NoOffset, PersistenceQuery }
 import pekko.remote.testkit.{ MultiNodeConfig, MultiNodeSpec }
@@ -20,8 +21,7 @@ import pekko.stream.testkit.TestSubscriber
 import pekko.stream.testkit.scaladsl.TestSink
 import org.scalatest.matchers.should.Matchers
 import org.scalatest.wordspec.AnyWordSpecLike
-
-import java.io.File
+import org.testcontainers.cassandra.CassandraContainer
 
 object EventsByTagMultiJvmSpec extends MultiNodeConfig {
   // No way to start and distribute the port so hard coding
@@ -53,6 +53,13 @@ object EventsByTagMultiJvmSpec extends MultiNodeConfig {
           keyspace = $name
         }
       }
+
+      datastax-java-driver {
+        basic {
+          load-balancing-policy.local-datacenter = "datacenter1"
+          contact-points = ["127.0.0.1:$CassPort"]
+        }
+      }
     """).withFallback(CassandraLifecycle.config))
 
 }
@@ -75,6 +82,8 @@ abstract class EventsByTagMultiJvmSpec
 
   override def initialParticipants: Int = roles.size
 
+  @volatile private var cassandraContainer: CassandraContainer[_] = _
+
   "EventsByTag" must {
 
     "init Cassandra" in {
@@ -147,26 +156,26 @@ abstract class EventsByTagMultiJvmSpec
       }
 
       enterBarrier("all-done")
+
+      runOn(node1) {
+        stopCassandra()
+      }
     }
   }
 
-  def startCassandra(
-      host: String,
-      port: Int,
-      systemName: String,
-      cassandraConfigResource: String = 
CassandraLauncher.DefaultTestConfigResource): Unit = {
-    val cassandraDirectory = new File(s"target/$systemName-$port")
-    CassandraLauncher.start(
-      cassandraDirectory,
-      configResource = cassandraConfigResource,
-      clean = true,
-      port = port,
-      CassandraLauncher.classpathForResources("logback-test.xml"),
-      Some(host))
+  def startCassandra(host: String, port: Int, systemName: String): Unit = {
+    // With testcontainers, Docker binds to all interfaces (0.0.0.0) by 
default,
+    // so the host parameter is not needed for binding.
+    cassandraContainer = new CassandraContainer("cassandra:3.11")
+    cassandraContainer.setPortBindings(java.util.Arrays.asList(s"$port:9042"))
+    cassandraContainer.start()
   }
 
   def stopCassandra(): Unit = {
-    CassandraLauncher.stop()
+    if (cassandraContainer != null) {
+      cassandraContainer.stop()
+      cassandraContainer = null
+    }
   }
 
 }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/ReconnectSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/ReconnectSpec.scala
index c881641..ce84df4 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/ReconnectSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/ReconnectSpec.scala
@@ -13,17 +13,16 @@
 
 package org.apache.pekko.persistence.cassandra
 
-import java.io.File
 import org.apache.pekko
 import pekko.actor.{ ActorSystem, Props }
 import pekko.persistence.cassandra.CassandraLifecycle.AwaitPersistenceInit
 import pekko.testkit.{ ImplicitSender, SocketUtil, TestKit }
 import com.typesafe.config.ConfigFactory
-import pekko.persistence.cassandra.testkit.CassandraLauncher
 import org.scalatest.Suite
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.matchers.should.Matchers
 import org.scalatest.wordspec.AnyWordSpecLike
+import org.testcontainers.cassandra.CassandraContainer
 
 object ReconnectSpec {
   val freePort = SocketUtil.temporaryLocalPort()
@@ -52,17 +51,14 @@ class ReconnectSpec
       pa ! "hello"
       expectNoMessage()
 
-      CassandraLauncher.start(
-        new File("target/ReconnectSpec"),
-        configResource = CassandraLauncher.DefaultTestConfigResource,
-        clean = true,
-        port = ReconnectSpec.freePort,
-        CassandraLauncher.classpathForResources("logback-test.xml"))
+      val cassandraContainer = new CassandraContainer("cassandra:3.11")
+      
cassandraContainer.setPortBindings(java.util.Arrays.asList(s"${ReconnectSpec.freePort}:9042"))
+      cassandraContainer.start()
 
       try {
         CassandraLifecycle.awaitPersistenceInit(system)
       } finally {
-        CassandraLauncher.stop()
+        cassandraContainer.stop()
       }
 
     }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
index ddda87f..396578f 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/query/EventsByTagSpec.scala
@@ -16,12 +16,15 @@ package org.apache.pekko.persistence.cassandra.query
 import java.time.temporal.ChronoUnit
 import java.time.{ LocalDate, LocalDateTime, ZoneOffset }
 import java.util.Optional
+
+import scala.concurrent.duration._
+
 import java.util.UUID
 import org.apache.pekko
 import pekko.actor.{ PoisonPill, Props }
 import pekko.event.Logging.Warning
 import pekko.persistence.cassandra.journal.CassandraJournalStatements
-import pekko.persistence.cassandra.{ CassandraLifecycle, CassandraSpec, Day }
+import pekko.persistence.cassandra.{ CassandraLifecycle, CassandraSpec, Day, 
PluginSettings }
 import pekko.persistence.journal.{ Tagged, WriteEventAdapter }
 import pekko.persistence.query.scaladsl.{ CurrentEventsByTagQuery, 
EventsByTagQuery }
 import pekko.persistence.query.{ EventEnvelope, NoOffset, Offset, 
TimeBasedUUID }
@@ -38,9 +41,6 @@ import com.typesafe.config.{ Config, ConfigFactory }
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.concurrent.Eventually.eventually
 
-import scala.concurrent.duration._
-import pekko.persistence.cassandra.PluginSettings
-
 object EventsByTagSpec {
   def withProbe[T](probe: TestSubscriber.Probe[Any], f: 
TestSubscriber.Probe[Any] => T): T = {
     try {
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncherSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncherSpec.scala
deleted file mode 100644
index 354b74b..0000000
--- 
a/core/src/test/scala/org/apache/pekko/persistence/cassandra/testkit/CassandraLauncherSpec.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * license agreements; and to You under the Apache License, version 2.0:
- *
- *   https://www.apache.org/licenses/LICENSE-2.0
- *
- * This file is part of the Apache Pekko project, which was derived from Akka.
- */
-
-/*
- * Copyright (C) 2016-2020 Lightbend Inc. <https://www.lightbend.com>
- */
-
-package org.apache.pekko.persistence.cassandra.testkit
-
-import java.io.File
-import java.net.InetSocketAddress
-
-import org.apache.pekko
-import pekko.actor.ActorSystem
-import pekko.testkit.TestKit
-import com.datastax.oss.driver.api.core.CqlSession
-import org.scalatest.wordspec.AnyWordSpecLike
-import org.scalatest.matchers.should.Matchers
-
-import scala.concurrent.duration._
-import org.scalatest.BeforeAndAfterAll
-
-class CassandraLauncherSpec
-    extends TestKit(ActorSystem("CassandraLauncherSpec"))
-    with Matchers
-    with AnyWordSpecLike
-    with BeforeAndAfterAll {
-
-  override protected def afterAll(): Unit = {
-    shutdown(system, verifySystemShutdown = true)
-    CassandraLauncher.stop()
-    super.afterAll()
-  }
-
-  private def testCassandra(): Unit = {
-    val session =
-      CqlSession
-        .builder()
-        .withLocalDatacenter("datacenter1")
-        .addContactPoint(new InetSocketAddress("localhost", 
CassandraLauncher.randomPort))
-        .build()
-    try session.execute("SELECT now() from system.local;").one()
-    finally {
-      session.close()
-    }
-  }
-
-  "The CassandraLauncher" must {
-    "support forking" in {
-      val cassandraDirectory = new File("target/" + system.name)
-      CassandraLauncher.start(
-        cassandraDirectory,
-        configResource = CassandraLauncher.DefaultTestConfigResource,
-        clean = true,
-        port = 0,
-        CassandraLauncher.classpathForResources("logback-test.xml"))
-
-      awaitAssert({
-          testCassandra()
-        }, 45.seconds)
-
-      CassandraLauncher.stop()
-
-      an[Exception] shouldBe thrownBy(testCassandra())
-    }
-  }
-
-}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 36b7337..d4b47fb 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -31,12 +31,15 @@ object Dependencies {
   val nettyVersion = "4.2.14.Final"
   val logback = "ch.qos.logback" % "logback-classic" % logbackVersion
 
+  val testcontainersVersion = "2.0.5"
+
   val pekkoPersistenceCassandraDependencies = Seq(
     "org.apache.cassandra" % "java-driver-core" % driverVersion,
     "io.netty" % "netty-handler" % nettyVersion,
     logback % Test,
     "org.scalatest" %% "scalatest" % "3.2.20" % Test,
-    "org.pegdown" % "pegdown" % "1.6.0" % Test)
+    "org.pegdown" % "pegdown" % "1.6.0" % Test,
+    "org.testcontainers" % "testcontainers-cassandra" % testcontainersVersion 
% Test)
 
   val exampleDependencies = Seq(
     logback,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to