This is an automated email from the ASF dual-hosted git repository.

pingtimeout pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris-tools.git


The following commit(s) were added to refs/heads/main by this push:
     new 5590740  feat: introduce SetupActions class (#90)
5590740 is described below

commit 5590740cfa1591d6e3a3f7363b897dfcd73568c2
Author: Artur Rakhmatulin <[email protected]>
AuthorDate: Wed Dec 10 16:41:13 2025 +0000

    feat: introduce SetupActions class (#90)
    
    - code deduplication
    - introduce SetupActions class
    - introduce AuthParameters
    - enhance auth-flow configuration
    - decouple auth-parameters from ConnectionParameters
    - moving governaning accessToken logic into SetupActions
---
 benchmarks/README.md                               |  11 +-
 .../src/gatling/resources/benchmark-defaults.conf  |   9 ++
 .../benchmarks/actions/AuthenticationActions.scala |  30 ++---
 .../polaris/benchmarks/actions/SetupActions.scala  | 126 +++++++++++++++++++++
 ...ectionParameters.scala => AuthParameters.scala} |  22 ++--
 .../benchmarks/parameters/BenchmarkConfig.scala    |  11 +-
 .../parameters/ConnectionParameters.scala          |   6 +-
 .../benchmarks/simulations/CreateCommits.scala     |  56 ++-------
 .../benchmarks/simulations/CreateTreeDataset.scala |  58 +++-------
 .../benchmarks/simulations/ReadTreeDataset.scala   |  61 +++-------
 .../simulations/ReadUpdateTreeDataset.scala        |  51 ++-------
 .../WeightedWorkloadOnTreeDataset.scala            |  40 ++-----
 12 files changed, 244 insertions(+), 237 deletions(-)

diff --git a/benchmarks/README.md b/benchmarks/README.md
index 954f6f5..a3dc868 100644
--- a/benchmarks/README.md
+++ b/benchmarks/README.md
@@ -57,16 +57,25 @@ dataset.tree {
 
 ### Connection Parameters
 
-Connection settings are configured under `http` and `auth`:
+Connection settings are configured under `http`:
 
 ```hocon
 http {
   base-url = "http://localhost:8181";  # Service URL
 }
+```
+
+### Authentication Parameters
 
+Authentication settings are configured under `auth`:
+
+```hocon
 auth {
   client-id = null      # Required: OAuth2 client ID
   client-secret = null  # Required: OAuth2 client secret
+  max-retries = 10      # Maximum number of retry attempts for authentication 
failures
+  retryable-http-codes = [500]  # HTTP status codes that should trigger a retry
+  refresh-interval-seconds = 60  # Refresh interval for the authentication 
token in seconds
 }
 ```
 
diff --git a/benchmarks/src/gatling/resources/benchmark-defaults.conf 
b/benchmarks/src/gatling/resources/benchmark-defaults.conf
index 7106536..9caeec0 100644
--- a/benchmarks/src/gatling/resources/benchmark-defaults.conf
+++ b/benchmarks/src/gatling/resources/benchmark-defaults.conf
@@ -33,6 +33,15 @@ auth {
   # OAuth2 client secret for authentication
   # Required: Must be provided in configuration
   client-secret = null
+
+  # Refresh interval for the authentication token in seconds
+  refresh-interval-seconds = 60
+
+  # Maximum number of retry attempts for authentication failures
+  max-retries = 10
+
+  # HTTP status codes that should trigger a retry
+  retryable-http-codes = [500]
 }
 
 # Dataset tree structure configuration
diff --git 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala
 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala
index 8c8b04e..8228ecc 100644
--- 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala
+++ 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/AuthenticationActions.scala
@@ -26,7 +26,7 @@ import org.apache.polaris.benchmarks.RetryOnHttpCodes.{
   retryOnHttpStatus,
   HttpRequestBuilderWithStatusSave
 }
-import org.apache.polaris.benchmarks.parameters.ConnectionParameters
+import org.apache.polaris.benchmarks.parameters.{AuthParameters, 
ConnectionParameters}
 import org.slf4j.LoggerFactory
 
 import java.util.concurrent.atomic.AtomicReference
@@ -35,30 +35,28 @@ import java.util.concurrent.atomic.AtomicReference
  * Actions for performance testing authentication operations. This class 
provides methods to
  * authenticate and manage access tokens for API requests.
  *
- * @param cp Connection parameters containing client credentials
+ * @param cp Connection parameters containing the base URL
+ * @param ap Authentication parameters
  * @param accessToken Reference to the authentication token shared across 
actions
- * @param maxRetries Maximum number of retry attempts for failed operations
- * @param retryableHttpCodes HTTP status codes that should trigger a retry
  */
 case class AuthenticationActions(
     cp: ConnectionParameters,
-    accessToken: AtomicReference[String],
-    maxRetries: Int = 10,
-    retryableHttpCodes: Set[Int] = Set(500)
+    ap: AuthParameters,
+    accessToken: AtomicReference[String]
 ) {
   private val logger = LoggerFactory.getLogger(getClass)
 
   /**
    * Creates a Gatling Feeder that provides authentication credentials. The 
feeder continuously
-   * supplies client ID and client secret from the connection parameters for 
use in authentication
-   * requests.
+   * supplies client ID and client secret from the authentication parameters 
for use in
+   * authentication requests.
    *
    * @return An iterator providing client credentials
    */
   def feeder(): Feeder[String] = Iterator.continually(
     Map(
-      "clientId" -> cp.clientId,
-      "clientSecret" -> cp.clientSecret
+      "clientId" -> ap.clientId,
+      "clientSecret" -> ap.clientSecret
     )
   )
 
@@ -71,7 +69,7 @@ case class AuthenticationActions(
    * There is no limit to the maximum number of users that can authenticate 
concurrently.
    */
   val authenticateAndSaveAccessToken: ChainBuilder =
-    retryOnHttpStatus(maxRetries, retryableHttpCodes, "Authenticate")(
+    retryOnHttpStatus(ap.maxRetries, ap.retryableHttpCodes, "Authenticate")(
       http("Authenticate")
         .post("/api/catalog/v1/oauth/tokens")
         .header("Content-Type", "application/x-www-form-urlencoded")
@@ -89,12 +87,4 @@ case class AuthenticationActions(
         }
         session
       }
-
-  /**
-   * Restores the current access token from the shared reference into the 
Gatling session. This
-   * operation is useful when a scenario needs to reuse an authentication 
token from a previous
-   * scenario.
-   */
-  val restoreAccessTokenInSession: ChainBuilder =
-    exec(session => session.set("accessToken", accessToken.get()))
 }
diff --git 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/SetupActions.scala
 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/SetupActions.scala
new file mode 100644
index 0000000..458ced4
--- /dev/null
+++ 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/actions/SetupActions.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.benchmarks.actions
+
+import io.gatling.core.Predef._
+import io.gatling.core.structure.{ChainBuilder, ScenarioBuilder}
+import org.apache.polaris.benchmarks.parameters.{AuthParameters, 
ConnectionParameters}
+
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
+import scala.concurrent.duration._
+
+/**
+ * Actions for setting up necessary shared states like authentication across 
benchmark simulations.
+ *
+ * @param cp Connection parameters containing the base URL
+ * @param ap Authentication parameters containing client credentials and retry 
settings
+ */
+case class SetupActions(
+    cp: ConnectionParameters,
+    ap: AuthParameters
+) {
+
+  /**
+   * Shared access token reference that can be passed to all action classes.
+   */
+  val accessToken: AtomicReference[String] = new AtomicReference()
+
+  /**
+   * Internal flag to control the token refresh loop.
+   */
+  private val shouldRefreshToken: AtomicBoolean = new AtomicBoolean(true)
+
+  /**
+   * Authentication actions instance that handles the actual OAuth token 
operations.
+   */
+  private val authActions: AuthenticationActions =
+    AuthenticationActions(cp, ap, accessToken)
+
+  /**
+   * Continuously refreshes the OAuth token at the configured interval 
specified in
+   * [[AuthParameters.refreshIntervalSeconds]]. This scenario runs 
indefinitely in a loop controlled
+   * by the shouldRefreshToken flag until [[stopRefreshingToken]] is called.
+   *
+   * @return ScenarioBuilder that continuously refreshes the token
+   */
+  def continuouslyRefreshOauthToken(): ScenarioBuilder = {
+    val interval = ap.refreshIntervalSeconds.seconds
+    scenario(s"Authenticate every ${interval.toSeconds}s using the Iceberg 
REST API")
+      .asLongAs(_ => shouldRefreshToken.get()) {
+        feed(authActions.feeder())
+          .exec(authActions.authenticateAndSaveAccessToken)
+          .pause(interval)
+      }
+  }
+
+  /**
+   * Refreshes the OAuth token at the configured interval specified in
+   * [[AuthParameters.refreshIntervalSeconds]] for a specified duration. Unlike
+   * [[continuouslyRefreshOauthToken]], this method automatically stops after 
the duration expires
+   * without requiring an explicit stop call.
+   *
+   * @param duration Total duration to keep refreshing the token
+   * @return ScenarioBuilder that refreshes the token for the specified 
duration
+   */
+  def refreshOauthForDuration(duration: FiniteDuration): ScenarioBuilder = {
+    val interval = ap.refreshIntervalSeconds.seconds
+    scenario(s"Authenticate every ${interval.toSeconds}s using the Iceberg 
REST API")
+      .during(duration) {
+        feed(authActions.feeder())
+          .exec(authActions.authenticateAndSaveAccessToken)
+          .pause(interval)
+      }
+  }
+
+  /**
+   * Waits for the authentication token to be available before proceeding. 
This is useful when the
+   * authentication is performed by a separate scenario. This operation does 
not make any network
+   * requests.
+   *
+   * @return ScenarioBuilder that waits for token availability
+   */
+  val waitForAuthentication: ScenarioBuilder =
+    scenario("Wait for the authentication token to be available")
+      .asLongAs(_ => accessToken.get() == null) {
+        pause(1.second)
+      }
+
+  /**
+   * Stops the token refresh loop. This is useful when the authentication is 
performed by a separate
+   * scenario. This operation does not make any network requests.
+   *
+   * @return ScenarioBuilder that stops the token refresh
+   */
+  val stopRefreshingToken: ScenarioBuilder =
+    scenario("Stop refreshing the authentication token")
+      .exec { session =>
+        shouldRefreshToken.set(false)
+        session
+      }
+
+  /**
+   * Restores the access token from the shared reference to the Gatling 
session. This is useful when
+   * the authentication is performed by a separate scenario. This operation 
does not make any
+   * network requests.
+   *
+   * @return ChainBuilder that restores the token to the session
+   */
+  val restoreAccessTokenInSession: ChainBuilder =
+    exec(session => session.set("accessToken", accessToken.get()))
+}
diff --git 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/ConnectionParameters.scala
 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/AuthParameters.scala
similarity index 62%
copy from 
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/ConnectionParameters.scala
copy to 
benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/AuthParameters.scala
index f227ca8..f6f8733 100644
--- 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/ConnectionParameters.scala
+++ 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/AuthParameters.scala
@@ -19,18 +19,24 @@
 package org.apache.polaris.benchmarks.parameters
 
 /**
- * Case class to hold the connection parameters for the benchmark.
+ * Case class to hold the authentication parameters for the benchmark.
  *
  * @param clientId The client ID for authentication.
  * @param clientSecret The client secret for authentication.
- * @param baseUrl The base URL of the Polaris service.
+ * @param refreshIntervalSeconds Refresh interval for the authentication token 
in seconds.
+ * @param maxRetries Maximum number of retry attempts for authentication 
failures.
+ * @param retryableHttpCodes HTTP status codes that should trigger a retry.
  */
-case class ConnectionParameters(clientId: String, clientSecret: String, 
baseUrl: String) {
+case class AuthParameters(
+    clientId: String,
+    clientSecret: String,
+    refreshIntervalSeconds: Int,
+    maxRetries: Int,
+    retryableHttpCodes: Set[Int]
+) {
   require(clientId != null && clientId.nonEmpty, "Client ID cannot be null or 
empty")
   require(clientSecret != null && clientSecret.nonEmpty, "Client secret cannot 
be null or empty")
-  require(baseUrl != null && baseUrl.nonEmpty, "Base URL cannot be null or 
empty")
-  require(
-    baseUrl.startsWith("http://";) || baseUrl.startsWith("https://";),
-    "Base URL must start with http:// or https://";
-  )
+  require(refreshIntervalSeconds > 0, "Refresh interval must be positive")
+  require(maxRetries >= 0, "Max retries cannot be negative")
+  require(retryableHttpCodes != null, "Retryable HTTP codes cannot be null")
 }
diff --git 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala
 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala
index 1674127..2865768 100644
--- 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala
+++ 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/BenchmarkConfig.scala
@@ -32,9 +32,15 @@ object BenchmarkConfig {
     val workload: Config = config.getConfig("workload")
 
     val connectionParams = ConnectionParameters(
+      http.getString("base-url")
+    )
+
+    val authParams = AuthParameters(
       auth.getString("client-id"),
       auth.getString("client-secret"),
-      http.getString("base-url")
+      auth.getInt("refresh-interval-seconds"),
+      auth.getInt("max-retries"),
+      
auth.getIntList("retryable-http-codes").toArray.map(_.asInstanceOf[Int]).toSet
     )
 
     val workloadParams = {
@@ -89,12 +95,13 @@ object BenchmarkConfig {
       dataset.getString("storage-config-info")
     )
 
-    BenchmarkConfig(connectionParams, workloadParams, datasetParams)
+    BenchmarkConfig(connectionParams, authParams, workloadParams, 
datasetParams)
   }
 }
 
 case class BenchmarkConfig(
     connectionParameters: ConnectionParameters,
+    authParameters: AuthParameters,
     workloadParameters: WorkloadParameters,
     datasetParameters: DatasetParameters
 ) {}
diff --git 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/ConnectionParameters.scala
 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/ConnectionParameters.scala
index f227ca8..b047fb3 100644
--- 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/ConnectionParameters.scala
+++ 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/parameters/ConnectionParameters.scala
@@ -21,13 +21,9 @@ package org.apache.polaris.benchmarks.parameters
 /**
  * Case class to hold the connection parameters for the benchmark.
  *
- * @param clientId The client ID for authentication.
- * @param clientSecret The client secret for authentication.
  * @param baseUrl The base URL of the Polaris service.
  */
-case class ConnectionParameters(clientId: String, clientSecret: String, 
baseUrl: String) {
-  require(clientId != null && clientId.nonEmpty, "Client ID cannot be null or 
empty")
-  require(clientSecret != null && clientSecret.nonEmpty, "Client secret cannot 
be null or empty")
+case class ConnectionParameters(baseUrl: String) {
   require(baseUrl != null && baseUrl.nonEmpty, "Base URL cannot be null or 
empty")
   require(
     baseUrl.startsWith("http://";) || baseUrl.startsWith("https://";),
diff --git 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateCommits.scala
 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateCommits.scala
index 6f1749c..0c5254e 100644
--- 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateCommits.scala
+++ 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateCommits.scala
@@ -21,14 +21,10 @@ package org.apache.polaris.benchmarks.simulations
 import io.gatling.core.Predef._
 import io.gatling.core.structure.ScenarioBuilder
 import io.gatling.http.Predef.http
-import org.apache.polaris.benchmarks.actions.{
-  AuthenticationActions,
-  NamespaceActions,
-  TableActions,
-  ViewActions
-}
+import org.apache.polaris.benchmarks.actions.{SetupActions, TableActions, 
ViewActions}
 import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
 import org.apache.polaris.benchmarks.parameters.{
+  AuthParameters,
   ConnectionParameters,
   DatasetParameters,
   WorkloadParameters
@@ -36,7 +32,6 @@ import org.apache.polaris.benchmarks.parameters.{
 import org.apache.polaris.benchmarks.util.CircularIterator
 import org.slf4j.LoggerFactory
 
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 import scala.concurrent.duration._
 
 class CreateCommits extends Simulation {
@@ -46,45 +41,16 @@ class CreateCommits extends Simulation {
   // Load parameters
   // 
--------------------------------------------------------------------------------
   val cp: ConnectionParameters = config.connectionParameters
+  val ap: AuthParameters = config.authParameters
   val dp: DatasetParameters = config.datasetParameters
   val wp: WorkloadParameters = config.workloadParameters
 
   // 
--------------------------------------------------------------------------------
   // Helper values
   // 
--------------------------------------------------------------------------------
-  private val accessToken: AtomicReference[String] = new AtomicReference()
-  private val shouldRefreshToken: AtomicBoolean = new AtomicBoolean(true)
-
-  private val authActions = AuthenticationActions(cp, accessToken)
-  private val tableActions = TableActions(dp, wp, accessToken)
-  private val viewActions = ViewActions(dp, wp, accessToken)
-
-  // 
--------------------------------------------------------------------------------
-  // Authentication related workloads:
-  // * Authenticate and store the access token for later use every minute
-  // * Wait for an OAuth token to be available
-  // * Stop the token refresh loop
-  // 
--------------------------------------------------------------------------------
-  val continuouslyRefreshOauthToken: ScenarioBuilder =
-    scenario("Authenticate every minute using the Iceberg REST API")
-      .asLongAs(_ => shouldRefreshToken.get())(
-        feed(authActions.feeder())
-          .exec(authActions.authenticateAndSaveAccessToken)
-          .pause(1.minute)
-      )
-
-  val waitForAuthentication: ScenarioBuilder =
-    scenario("Wait for the authentication token to be available")
-      .asLongAs(_ => accessToken.get() == null)(
-        pause(1.second)
-      )
-
-  val stopRefreshingToken: ScenarioBuilder =
-    scenario("Stop refreshing the authentication token")
-      .exec { session =>
-        shouldRefreshToken.set(false)
-        session
-      }
+  private val setupActions = SetupActions(cp, ap)
+  private val tableActions = TableActions(dp, wp, setupActions.accessToken)
+  private val viewActions = ViewActions(dp, wp, setupActions.accessToken)
 
   // 
--------------------------------------------------------------------------------
   // Read and write workloads:
@@ -93,7 +59,7 @@ class CreateCommits extends Simulation {
   // 
--------------------------------------------------------------------------------
   val tableUpdateScenario: ScenarioBuilder =
     scenario("Create table commits by updating properties")
-      .exec(authActions.restoreAccessTokenInSession)
+      .exec(setupActions.restoreAccessTokenInSession)
       .feed(tableActions.propertyUpdateFeeder())
       .exec(tableActions.updateTable)
 
@@ -104,7 +70,7 @@ class CreateCommits extends Simulation {
   // 
--------------------------------------------------------------------------------
   val viewUpdateScenario: ScenarioBuilder =
     scenario("Create view commits by updating properties")
-      .exec(authActions.restoreAccessTokenInSession)
+      .exec(setupActions.restoreAccessTokenInSession)
       .feed(viewActions.propertyUpdateFeeder())
       .exec(viewActions.updateView)
 
@@ -117,8 +83,8 @@ class CreateCommits extends Simulation {
   private val viewCommitsThroughput = wp.createCommits.viewCommitsThroughput
   private val durationInMinutes = wp.createCommits.durationInMinutes
   setUp(
-    
continuouslyRefreshOauthToken.inject(atOnceUsers(1)).protocols(httpProtocol),
-    waitForAuthentication
+    
setupActions.continuouslyRefreshOauthToken().inject(atOnceUsers(1)).protocols(httpProtocol),
+    setupActions.waitForAuthentication
       .inject(atOnceUsers(1))
       .andThen(
         tableUpdateScenario
@@ -132,6 +98,6 @@ class CreateCommits extends Simulation {
           )
           .protocols(httpProtocol)
       )
-      
.andThen(stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
+      
.andThen(setupActions.stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
   )
 }
diff --git 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateTreeDataset.scala
 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateTreeDataset.scala
index ae04756..bec73a7 100644
--- 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateTreeDataset.scala
+++ 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/CreateTreeDataset.scala
@@ -24,13 +24,14 @@ import io.gatling.http.Predef._
 import org.apache.polaris.benchmarks.actions._
 import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
 import org.apache.polaris.benchmarks.parameters.{
+  AuthParameters,
   ConnectionParameters,
   DatasetParameters,
   WorkloadParameters
 }
 import org.slf4j.LoggerFactory
 
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, 
AtomicReference}
+import java.util.concurrent.atomic.AtomicInteger
 import scala.concurrent.duration._
 
 /**
@@ -44,6 +45,7 @@ class CreateTreeDataset extends Simulation {
   // Load parameters
   // 
--------------------------------------------------------------------------------
   val cp: ConnectionParameters = config.connectionParameters
+  val ap: AuthParameters = config.authParameters
   val dp: DatasetParameters = config.datasetParameters
   val wp: WorkloadParameters = config.workloadParameters
 
@@ -51,53 +53,23 @@ class CreateTreeDataset extends Simulation {
   // Helper values
   // 
--------------------------------------------------------------------------------
   private val numNamespaces: Int = dp.nAryTree.numberOfNodes
-  private val accessToken: AtomicReference[String] = new AtomicReference()
-  private val shouldRefreshToken: AtomicBoolean = new AtomicBoolean(true)
-
-  private val authenticationActions = AuthenticationActions(cp, accessToken, 
5, Set(500))
-  private val catalogActions = CatalogActions(dp, accessToken, 0, Set())
-  private val namespaceActions = NamespaceActions(dp, wp, accessToken, 5, 
Set(500))
-  private val tableActions = TableActions(dp, wp, accessToken, 0, Set())
-  private val viewActions = ViewActions(dp, wp, accessToken, 0, Set())
+  private val setupActions = SetupActions(cp, ap)
+  private val catalogActions = CatalogActions(dp, setupActions.accessToken, 0, 
Set())
+  private val namespaceActions = NamespaceActions(dp, wp, 
setupActions.accessToken, 5, Set(500))
+  private val tableActions = TableActions(dp, wp, setupActions.accessToken, 0, 
Set())
+  private val viewActions = ViewActions(dp, wp, setupActions.accessToken, 0, 
Set())
 
   private val createdCatalogs = new AtomicInteger()
   private val createdNamespaces = new AtomicInteger()
   private val createdTables = new AtomicInteger()
   private val createdViews = new AtomicInteger()
 
-  // 
--------------------------------------------------------------------------------
-  // Authentication related workloads:
-  // * Authenticate and store the access token for later use every minute
-  // * Wait for an OAuth token to be available
-  // * Stop the token refresh loop
-  // 
--------------------------------------------------------------------------------
-  val continuouslyRefreshOauthToken: ScenarioBuilder =
-    scenario("Authenticate every minute using the Iceberg REST API")
-      .asLongAs(_ => shouldRefreshToken.get()) {
-        feed(authenticationActions.feeder())
-          .exec(authenticationActions.authenticateAndSaveAccessToken)
-          .pause(1.minute)
-      }
-
-  val waitForAuthentication: ScenarioBuilder =
-    scenario("Wait for the authentication token to be available")
-      .asLongAs(_ => accessToken.get() == null) {
-        pause(1.second)
-      }
-
-  val stopRefreshingToken: ScenarioBuilder =
-    scenario("Stop refreshing the authentication token")
-      .exec { session =>
-        shouldRefreshToken.set(false)
-        session
-      }
-
   // 
--------------------------------------------------------------------------------
   // Workload: Create catalogs
   // 
--------------------------------------------------------------------------------
   val createCatalogs: ScenarioBuilder =
     scenario("Create catalogs using the Polaris Management REST API")
-      .exec(authenticationActions.restoreAccessTokenInSession)
+      .exec(setupActions.restoreAccessTokenInSession)
       .asLongAs(session =>
         createdCatalogs.getAndIncrement() < dp.numCatalogs && 
session.contains("accessToken")
       )(
@@ -109,7 +81,7 @@ class CreateTreeDataset extends Simulation {
   // Workload: Create namespaces
   // 
--------------------------------------------------------------------------------
   val createNamespaces: ScenarioBuilder = scenario("Create namespaces using 
the Iceberg REST API")
-    .exec(authenticationActions.restoreAccessTokenInSession)
+    .exec(setupActions.restoreAccessTokenInSession)
     .asLongAs(session =>
       createdNamespaces.getAndIncrement() < numNamespaces && 
session.contains("accessToken")
     )(
@@ -121,7 +93,7 @@ class CreateTreeDataset extends Simulation {
   // Workload: Create tables
   // 
--------------------------------------------------------------------------------
   val createTables: ScenarioBuilder = scenario("Create tables using the 
Iceberg REST API")
-    .exec(authenticationActions.restoreAccessTokenInSession)
+    .exec(setupActions.restoreAccessTokenInSession)
     .asLongAs(session =>
       createdTables.getAndIncrement() < dp.numTables && 
session.contains("accessToken")
     )(
@@ -133,7 +105,7 @@ class CreateTreeDataset extends Simulation {
   // Workload: Create views
   // 
--------------------------------------------------------------------------------
   val createViews: ScenarioBuilder = scenario("Create views using the Iceberg 
REST API")
-    .exec(authenticationActions.restoreAccessTokenInSession)
+    .exec(setupActions.restoreAccessTokenInSession)
     .asLongAs(session =>
       createdViews.getAndIncrement() < dp.numViews && 
session.contains("accessToken")
     )(
@@ -155,8 +127,8 @@ class CreateTreeDataset extends Simulation {
   private val viewThroughput = wp.createTreeDataset.viewThroughput
 
   setUp(
-    
continuouslyRefreshOauthToken.inject(atOnceUsers(1)).protocols(httpProtocol),
-    waitForAuthentication
+    
setupActions.continuouslyRefreshOauthToken().inject(atOnceUsers(1)).protocols(httpProtocol),
+    setupActions.waitForAuthentication
       .inject(atOnceUsers(1))
       .andThen(createCatalogs.inject(atOnceUsers(1)).protocols(httpProtocol))
       .andThen(
@@ -169,6 +141,6 @@ class CreateTreeDataset extends Simulation {
       )
       
.andThen(createTables.inject(atOnceUsers(tableThroughput)).protocols(httpProtocol))
       
.andThen(createViews.inject(atOnceUsers(viewThroughput)).protocols(httpProtocol))
-      
.andThen(stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
+      
.andThen(setupActions.stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
   )
 }
diff --git 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadTreeDataset.scala
 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadTreeDataset.scala
index 44a0077..2b8d9e0 100644
--- 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadTreeDataset.scala
+++ 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadTreeDataset.scala
@@ -23,11 +23,11 @@ import io.gatling.core.structure.ScenarioBuilder
 import io.gatling.http.Predef._
 import org.apache.polaris.benchmarks.actions._
 import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
-import org.apache.polaris.benchmarks.parameters.WorkloadParameters
+import org.apache.polaris.benchmarks.parameters.{AuthParameters, 
WorkloadParameters}
 import org.slf4j.LoggerFactory
 
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, 
AtomicReference}
-import scala.concurrent.duration.DurationInt
+import java.util.concurrent.atomic.AtomicInteger
+import scala.concurrent.duration._
 
 /**
  * This simulation is a 100% read workload that fetches a tree dataset in 
Polaris. It is intended to
@@ -42,6 +42,7 @@ class ReadTreeDataset extends Simulation {
   // Load parameters
   // 
--------------------------------------------------------------------------------
   private val cp = config.connectionParameters
+  private val ap: AuthParameters = config.authParameters
   private val dp = config.datasetParameters
   val wp: WorkloadParameters = config.workloadParameters
 
@@ -49,52 +50,22 @@ class ReadTreeDataset extends Simulation {
   // Helper values
   // 
--------------------------------------------------------------------------------
   private val numNamespaces: Int = dp.nAryTree.numberOfNodes
-  private val accessToken: AtomicReference[String] = new AtomicReference()
-  private val shouldRefreshToken: AtomicBoolean = new AtomicBoolean(true)
-
-  private val authenticationActions = AuthenticationActions(cp, accessToken)
-  private val catalogActions = CatalogActions(dp, accessToken)
-  private val namespaceActions = NamespaceActions(dp, wp, accessToken)
-  private val tableActions = TableActions(dp, wp, accessToken)
-  private val viewActions = ViewActions(dp, wp, accessToken)
+  private val setupActions = SetupActions(cp, ap)
+  private val catalogActions = CatalogActions(dp, setupActions.accessToken)
+  private val namespaceActions = NamespaceActions(dp, wp, 
setupActions.accessToken)
+  private val tableActions = TableActions(dp, wp, setupActions.accessToken)
+  private val viewActions = ViewActions(dp, wp, setupActions.accessToken)
 
   private val verifiedCatalogs = new AtomicInteger()
   private val verifiedNamespaces = new AtomicInteger()
   private val verifiedTables = new AtomicInteger()
   private val verifiedViews = new AtomicInteger()
 
-  // 
--------------------------------------------------------------------------------
-  // Authentication related workloads:
-  // * Authenticate and store the access token for later use every minute
-  // * Wait for an OAuth token to be available
-  // * Stop the token refresh loop
-  // 
--------------------------------------------------------------------------------
-  val continuouslyRefreshOauthToken: ScenarioBuilder =
-    scenario("Authenticate every minute using the Iceberg REST API")
-      .asLongAs(_ => shouldRefreshToken.get()) {
-        feed(authenticationActions.feeder())
-          .exec(authenticationActions.authenticateAndSaveAccessToken)
-          .pause(1.minute)
-      }
-
-  val waitForAuthentication: ScenarioBuilder =
-    scenario("Wait for the authentication token to be available")
-      .asLongAs(_ => accessToken.get() == null) {
-        pause(1.second)
-      }
-
-  val stopRefreshingToken: ScenarioBuilder =
-    scenario("Stop refreshing the authentication token")
-      .exec { session =>
-        shouldRefreshToken.set(false)
-        session
-      }
-
   // 
--------------------------------------------------------------------------------
   // Workload: Verify each catalog
   // 
--------------------------------------------------------------------------------
   private val verifyCatalogs = scenario("Verify catalogs using the Polaris 
Management REST API")
-    .exec(authenticationActions.restoreAccessTokenInSession)
+    .exec(setupActions.restoreAccessTokenInSession)
     .asLongAs(session =>
       verifiedCatalogs.getAndIncrement() < dp.numCatalogs && 
session.contains("accessToken")
     )(
@@ -106,7 +77,7 @@ class ReadTreeDataset extends Simulation {
   // Workload: Verify namespaces
   // 
--------------------------------------------------------------------------------
   private val verifyNamespaces = scenario("Verify namespaces using the Iceberg 
REST API")
-    .exec(authenticationActions.restoreAccessTokenInSession)
+    .exec(setupActions.restoreAccessTokenInSession)
     .asLongAs(session =>
       verifiedNamespaces.getAndIncrement() < numNamespaces && 
session.contains("accessToken")
     )(
@@ -120,7 +91,7 @@ class ReadTreeDataset extends Simulation {
   // Workload: Verify tables
   // 
--------------------------------------------------------------------------------
   private val verifyTables = scenario("Verify tables using the Iceberg REST 
API")
-    .exec(authenticationActions.restoreAccessTokenInSession)
+    .exec(setupActions.restoreAccessTokenInSession)
     .asLongAs(session =>
       verifiedTables.getAndIncrement() < dp.numTables && 
session.contains("accessToken")
     )(
@@ -134,7 +105,7 @@ class ReadTreeDataset extends Simulation {
   // Workload: Verify views
   // 
--------------------------------------------------------------------------------
   private val verifyViews = scenario("Verify views using the Iceberg REST API")
-    .exec(authenticationActions.restoreAccessTokenInSession)
+    .exec(setupActions.restoreAccessTokenInSession)
     .asLongAs(session =>
       verifiedViews.getAndIncrement() < dp.numViews && 
session.contains("accessToken")
     )(
@@ -158,13 +129,13 @@ class ReadTreeDataset extends Simulation {
   private val viewThroughput = wp.readTreeDataset.viewThroughput
 
   setUp(
-    
continuouslyRefreshOauthToken.inject(atOnceUsers(1)).protocols(httpProtocol),
-    waitForAuthentication
+    
setupActions.continuouslyRefreshOauthToken().inject(atOnceUsers(1)).protocols(httpProtocol),
+    setupActions.waitForAuthentication
       .inject(atOnceUsers(1))
       .andThen(verifyCatalogs.inject(atOnceUsers(1)).protocols(httpProtocol))
       
.andThen(verifyNamespaces.inject(atOnceUsers(dp.nsDepth)).protocols(httpProtocol))
       
.andThen(verifyTables.inject(atOnceUsers(tableThroughput)).protocols(httpProtocol))
       
.andThen(verifyViews.inject(atOnceUsers(viewThroughput)).protocols(httpProtocol))
-      
.andThen(stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
+      
.andThen(setupActions.stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
   )
 }
diff --git 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadUpdateTreeDataset.scala
 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadUpdateTreeDataset.scala
index 1827c0c..abf744f 100644
--- 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadUpdateTreeDataset.scala
+++ 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/ReadUpdateTreeDataset.scala
@@ -24,6 +24,7 @@ import io.gatling.http.Predef._
 import org.apache.polaris.benchmarks.actions._
 import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
 import org.apache.polaris.benchmarks.parameters.{
+  AuthParameters,
   ConnectionParameters,
   DatasetParameters,
   WorkloadParameters
@@ -31,7 +32,6 @@ import org.apache.polaris.benchmarks.parameters.{
 import org.apache.polaris.benchmarks.util.CircularIterator
 import org.slf4j.LoggerFactory
 
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 import scala.concurrent.duration._
 
 /**
@@ -47,20 +47,18 @@ class ReadUpdateTreeDataset extends Simulation {
   // Load parameters
   // 
--------------------------------------------------------------------------------
   val cp: ConnectionParameters = config.connectionParameters
+  val ap: AuthParameters = config.authParameters
   val dp: DatasetParameters = config.datasetParameters
   val wp: WorkloadParameters = config.workloadParameters
 
   // 
--------------------------------------------------------------------------------
   // Helper values
   // 
--------------------------------------------------------------------------------
-  private val accessToken: AtomicReference[String] = new AtomicReference()
-  private val shouldRefreshToken: AtomicBoolean = new AtomicBoolean(true)
-
-  private val authActions = AuthenticationActions(cp, accessToken)
-  private val catActions = CatalogActions(dp, accessToken)
-  private val nsActions = NamespaceActions(dp, wp, accessToken)
-  private val tblActions = TableActions(dp, wp, accessToken)
-  private val viewActions = ViewActions(dp, wp, accessToken)
+  private val setupActions = SetupActions(cp, ap)
+  private val catActions = CatalogActions(dp, setupActions.accessToken)
+  private val nsActions = NamespaceActions(dp, wp, setupActions.accessToken)
+  private val tblActions = TableActions(dp, wp, setupActions.accessToken)
+  private val viewActions = ViewActions(dp, wp, setupActions.accessToken)
 
   private val nsListFeeder = new 
CircularIterator(nsActions.namespaceIdentityFeeder)
   private val nsExistsFeeder = new 
CircularIterator(nsActions.namespaceIdentityFeeder)
@@ -77,39 +75,12 @@ class ReadUpdateTreeDataset extends Simulation {
   private val viewFetchFeeder = new 
CircularIterator(viewActions.viewFetchFeeder)
   private val viewUpdateFeeder = viewActions.propertyUpdateFeeder()
 
-  // 
--------------------------------------------------------------------------------
-  // Authentication related workloads:
-  // * Authenticate and store the access token for later use every minute
-  // * Wait for an OAuth token to be available
-  // * Stop the token refresh loop
-  // 
--------------------------------------------------------------------------------
-  val continuouslyRefreshOauthToken: ScenarioBuilder =
-    scenario("Authenticate every minute using the Iceberg REST API")
-      .asLongAs(_ => shouldRefreshToken.get()) {
-        feed(authActions.feeder())
-          .exec(authActions.authenticateAndSaveAccessToken)
-          .pause(1.minute)
-      }
-
-  val waitForAuthentication: ScenarioBuilder =
-    scenario("Wait for the authentication token to be available")
-      .asLongAs(_ => accessToken.get() == null) {
-        pause(1.second)
-      }
-
-  val stopRefreshingToken: ScenarioBuilder =
-    scenario("Stop refreshing the authentication token")
-      .exec { session =>
-        shouldRefreshToken.set(false)
-        session
-      }
-
   // 
--------------------------------------------------------------------------------
   // Workload: Randomly read and write entities
   // 
--------------------------------------------------------------------------------
   val readWriteScenario: ScenarioBuilder =
     scenario("Read and write entities using the Iceberg REST API")
-      .exec(authActions.restoreAccessTokenInSession)
+      .exec(setupActions.restoreAccessTokenInSession)
       .randomSwitch(
         wp.readUpdateTreeDataset.gatlingReadRatio -> group("Read")(
           uniformRandomSwitch(
@@ -147,8 +118,8 @@ class ReadUpdateTreeDataset extends Simulation {
   private val durationInMinutes = wp.readUpdateTreeDataset.durationInMinutes
 
   setUp(
-    
continuouslyRefreshOauthToken.inject(atOnceUsers(1)).protocols(httpProtocol),
-    waitForAuthentication
+    
setupActions.continuouslyRefreshOauthToken().inject(atOnceUsers(1)).protocols(httpProtocol),
+    setupActions.waitForAuthentication
       .inject(atOnceUsers(1))
       .andThen(
         readWriteScenario
@@ -157,6 +128,6 @@ class ReadUpdateTreeDataset extends Simulation {
           )
           .protocols(httpProtocol)
       )
-      
.andThen(stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
+      
.andThen(setupActions.stopRefreshingToken.inject(atOnceUsers(1)).protocols(httpProtocol))
   )
 }
diff --git 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala
 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala
index f6e8dc1..bc6376e 100644
--- 
a/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala
+++ 
b/benchmarks/src/gatling/scala/org/apache/polaris/benchmarks/simulations/WeightedWorkloadOnTreeDataset.scala
@@ -24,6 +24,7 @@ import io.gatling.http.Predef._
 import org.apache.polaris.benchmarks.actions._
 import org.apache.polaris.benchmarks.parameters.BenchmarkConfig.config
 import org.apache.polaris.benchmarks.parameters.{
+  AuthParameters,
   ConnectionParameters,
   DatasetParameters,
   Distribution,
@@ -32,7 +33,6 @@ import org.apache.polaris.benchmarks.parameters.{
 }
 import org.slf4j.LoggerFactory
 
-import java.util.concurrent.atomic.AtomicReference
 import scala.concurrent.duration._
 
 /**
@@ -47,6 +47,7 @@ class WeightedWorkloadOnTreeDataset extends Simulation {
   // Load parameters
   // 
--------------------------------------------------------------------------------
   val cp: ConnectionParameters = config.connectionParameters
+  val ap: AuthParameters = config.authParameters
   val dp: DatasetParameters = config.datasetParameters
   val wp: WorkloadParameters = config.workloadParameters
 
@@ -62,30 +63,13 @@ class WeightedWorkloadOnTreeDataset extends Simulation {
   println("### Writer distributions ###")
   wp.weightedWorkloadOnTreeDataset.writers.foreach(_.printDescription(dp))
 
-  // 
--------------------------------------------------------------------------------
-  // Helper values
-  // 
--------------------------------------------------------------------------------
-  private val accessToken: AtomicReference[String] = new AtomicReference()
-
-  private val authActions = AuthenticationActions(cp, accessToken)
-  private val tblActions = TableActions(dp, wp, accessToken)
+  private val duration = 
wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes
 
   // 
--------------------------------------------------------------------------------
-  // Authentication related workloads
+  // Helper values
   // 
--------------------------------------------------------------------------------
-  val refreshOauthForDuration: ScenarioBuilder =
-    scenario("Authenticate every 30s using the Iceberg REST API")
-      .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
-        feed(authActions.feeder())
-          .exec(authActions.authenticateAndSaveAccessToken)
-          .pause(30.seconds)
-      }
-
-  val waitForAuthentication: ScenarioBuilder =
-    scenario("Wait for the authentication token to be available")
-      .asLongAs(_ => accessToken.get() == null) {
-        pause(1.second)
-      }
+  private val setupActions = SetupActions(cp, ap)
+  private val tblActions = TableActions(dp, wp, setupActions.accessToken)
 
   // 
--------------------------------------------------------------------------------
   // Build up the HTTP protocol configuration and set up the simulation
@@ -105,8 +89,8 @@ class WeightedWorkloadOnTreeDataset extends Simulation {
         val rnp =
           RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, ((i + 1) 
* 1000) + threadId)
         scenario(s"Reader-$i-$threadId")
-          .exec(authActions.restoreAccessTokenInSession)
-          .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+          .exec(setupActions.restoreAccessTokenInSession)
+          .during(duration) {
             exec { session =>
               val tableIndex = dist.sample(dp.maxPossibleTables, rnp)
               val (catalog, namespace, table) =
@@ -136,8 +120,8 @@ class WeightedWorkloadOnTreeDataset extends Simulation {
         val rnp =
           RandomNumberProvider(wp.weightedWorkloadOnTreeDataset.seed, ((i + 1) 
* 2000) + threadId)
         scenario(s"Writer-$i-$threadId")
-          .exec(authActions.restoreAccessTokenInSession)
-          .during(wp.weightedWorkloadOnTreeDataset.durationInMinutes.minutes) {
+          .exec(setupActions.restoreAccessTokenInSession)
+          .during(duration) {
             exec { session =>
               val tableIndex = dist.sample(dp.maxPossibleTables, rnp)
               val (catalog, namespace, table) =
@@ -161,8 +145,8 @@ class WeightedWorkloadOnTreeDataset extends Simulation {
   // Setup
   // 
--------------------------------------------------------------------------------
   setUp(
-    refreshOauthForDuration.inject(atOnceUsers(1)).protocols(httpProtocol),
-    waitForAuthentication
+    
setupActions.refreshOauthForDuration(duration).inject(atOnceUsers(1)).protocols(httpProtocol),
+    setupActions.waitForAuthentication
       .inject(atOnceUsers(1))
       .protocols(httpProtocol)
       .andThen(


Reply via email to