This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d9e1ee445762 [SPARK-55494][SS] Introduce iterator/prefixScan with 
multi-values in StateStore API
d9e1ee445762 is described below

commit d9e1ee44576285f07c4d1803eec411a764a48d8b
Author: Jungtaek Lim <[email protected]>
AuthorDate: Thu Feb 19 06:53:15 2026 +0900

    [SPARK-55494][SS] Introduce iterator/prefixScan with multi-values in 
StateStore API
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to introduce iterator/prefixScan with multi-values in 
StateStore API.
    
    ### Why are the changes needed?
    
    The functionality is missing on StateStore API so when the caller sets 
multi-values for specific CF, that CF doesn't support scanning through the 
data. The new functionality will be used in new state format version in 
stream-stream join, specifically SPARK-55144 (#53930).
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New UTs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: claude-4.5-sonnet
    
    The above is used for creating a new test suite. All other parts aren't 
generated by LLM.
    
    Closes #54278 from HeartSaVioR/SPARK-55494.
    
    Authored-by: Jungtaek Lim <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../state/HDFSBackedStateStoreProvider.scala       |  24 +++
 .../state/RocksDBStateStoreProvider.scala          |  68 ++++++++
 .../sql/execution/streaming/state/StateStore.scala |  32 ++++
 .../streaming/state/MemoryStateStore.scala         |  10 ++
 .../RocksDBStateStoreCheckpointFormatV2Suite.scala |  10 ++
 .../RocksDBTimestampEncoderOperationsSuite.scala   | 178 +++++++++++++++++++--
 6 files changed, 313 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 3a5d14f5581a..3303b414ccd3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -111,6 +111,17 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
 
     override def allColumnFamilyNames: Set[String] =
       Set[String](StateStore.DEFAULT_COL_FAMILY_NAME)
+
+    override def prefixScanWithMultiValues(
+        prefixKey: UnsafeRow,
+        colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+      throw 
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", 
"HDFSStateStore")
+    }
+
+    override def iteratorWithMultiValues(
+        colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+      throw 
StateStoreErrors.unsupportedOperationException("multipleValuesPerKey", 
"HDFSStateStore")
+    }
   }
 
   /** Implementation of [[StateStore]] API which is backed by an 
HDFS-compatible file system */
@@ -323,6 +334,19 @@ private[sql] class HDFSBackedStateStoreProvider extends 
StateStoreProvider with
         key: UnsafeRow, values: Array[UnsafeRow], colFamilyName: String): Unit 
= {
       throw StateStoreErrors.unsupportedOperationException("mergeList", 
providerName)
     }
+
+    override def prefixScanWithMultiValues(
+        prefixKey: UnsafeRow,
+        colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+      throw StateStoreErrors.unsupportedOperationException(
+        "prefixScanWithMultiValues", providerName)
+    }
+
+    override def iteratorWithMultiValues(
+        colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+      throw StateStoreErrors.unsupportedOperationException(
+        "iteratorWithMultiValues", providerName)
+    }
   }
 
   def getMetricsForProvider(): Map[String, Long] = synchronized {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 6dde67dd5b66..d8e0440e210d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -458,6 +458,40 @@ private[sql] class RocksDBStateStoreProvider
       }
     }
 
+    override def iteratorWithMultiValues(
+        colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+      validateAndTransitionState(UPDATE)
+      // Note this verify function only verify on the colFamilyName being 
valid,
+      // we are actually doing prefix when useColumnFamilies,
+      // but pass "iteratorWithMultiValues" to throw correct error message
+      verifyColFamilyOperations("iteratorWithMultiValues", colFamilyName)
+
+      val kvEncoder = keyValueEncoderMap.get(colFamilyName)
+      verify(
+        kvEncoder._2.supportsMultipleValuesPerKey,
+        "Multi-value iterator operation requires an encoder" +
+          " which supports multiple values for a single key")
+
+      val rocksDbIter = rocksDB.iterator(colFamilyName)
+
+      val rowPair = new UnsafeRowPair()
+      val iter = rocksDbIter.flatMap { kv =>
+        val keyRow = kvEncoder._1.decodeKey(kv.key)
+        val valueRows = kvEncoder._2.decodeValues(kv.value)
+        valueRows.iterator.map { valueRow =>
+          rowPair.withRows(keyRow, valueRow)
+          if (!isValidated && rowPair.value != null && !useColumnFamilies) {
+            StateStoreProvider.validateStateRowFormat(
+              rowPair.key, keySchema, rowPair.value, valueSchema, 
stateStoreId, storeConf)
+            isValidated = true
+          }
+          rowPair
+        }
+      }
+
+      new StateStoreIterator(iter, rocksDbIter.closeIfNeeded)
+    }
+
     override def prefixScan(
         prefixKey: UnsafeRow,
         colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
@@ -481,6 +515,40 @@ private[sql] class RocksDBStateStoreProvider
       new StateStoreIterator(iter, rocksDbIter.closeIfNeeded)
     }
 
+    override def prefixScanWithMultiValues(
+        prefixKey: UnsafeRow, colFamilyName: String): 
StateStoreIterator[UnsafeRowPair] = {
+      validateAndTransitionState(UPDATE)
+      verifyColFamilyOperations("prefixScanWithMultiValues", colFamilyName)
+
+      val kvEncoder = keyValueEncoderMap.get(colFamilyName)
+      verify(kvEncoder._1.supportPrefixKeyScan,
+        "prefixScanWithMultiValues requires encoder supporting prefix scan!")
+      verify(
+        kvEncoder._2.supportsMultipleValuesPerKey,
+        "Multi-value iterator operation requires an encoder" +
+          " which supports multiple values for a single key")
+
+      val prefix = kvEncoder._1.encodePrefixKey(prefixKey)
+      val rocksDbIter = rocksDB.prefixScan(prefix, colFamilyName)
+
+      val rowPair = new UnsafeRowPair()
+      val iter = rocksDbIter.flatMap { kv =>
+        val keyRow = kvEncoder._1.decodeKey(kv.key)
+        val valueRows = kvEncoder._2.decodeValues(kv.value)
+        valueRows.iterator.map { valueRow =>
+          rowPair.withRows(keyRow, valueRow)
+          if (!isValidated && rowPair.value != null && !useColumnFamilies) {
+            StateStoreProvider.validateStateRowFormat(
+              rowPair.key, keySchema, rowPair.value, valueSchema, 
stateStoreId, storeConf)
+            isValidated = true
+          }
+          rowPair
+        }
+      }
+
+      new StateStoreIterator(iter, rocksDbIter.closeIfNeeded)
+    }
+
     var checkpointInfo: Option[StateStoreCheckpointInfo] = None
     private var storedMetrics: Option[RocksDBMetrics] = None
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index acb82680d279..e2d7c166a675 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -171,10 +171,31 @@ trait ReadStateStore {
       prefixKey: UnsafeRow,
       colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): 
StateStoreIterator[UnsafeRowPair]
 
+  /**
+   * Return an iterator containing all the (key, value) pairs which are 
matched with
+   * the given prefix key.
+   *
+   * It is expected to throw exception if Spark calls this method without 
proper key encoding spec.
+   * It is also expected to throw exception if Spark calls this method without 
setting
+   * multipleValuesPerKey as true for the column family.
+   */
+  def prefixScanWithMultiValues(
+      prefixKey: UnsafeRow,
+      colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): 
StateStoreIterator[UnsafeRowPair]
+
   /** Return an iterator containing all the key-value pairs in the StateStore. 
*/
   def iterator(
       colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): 
StateStoreIterator[UnsafeRowPair]
 
+  /**
+   * Return an iterator containing all the key-value pairs in the StateStore.
+   *
+   * It is expected to throw exception if Spark calls this method without 
setting
+   * multipleValuesPerKey as true for the column family.
+   */
+  def iteratorWithMultiValues(
+      colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): 
StateStoreIterator[UnsafeRowPair]
+
   /**
    * Clean up the resource.
    *
@@ -384,6 +405,17 @@ class WrappedReadStateStore(store: StateStore) extends 
ReadStateStore {
   }
 
   override def allColumnFamilyNames: Set[String] = store.allColumnFamilyNames
+
+  override def prefixScanWithMultiValues(
+      prefixKey: UnsafeRow,
+      colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+    store.prefixScanWithMultiValues(prefixKey, colFamilyName)
+  }
+
+  override def iteratorWithMultiValues(
+      colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+    store.iteratorWithMultiValues(colFamilyName)
+  }
 }
 
 /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
index ba5237d91305..cf0cab3c4623 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MemoryStateStore.scala
@@ -93,6 +93,16 @@ class MemoryStateStore extends StateStore() {
     throw new UnsupportedOperationException("Doesn't support multiple values 
per key")
   }
 
+  override def prefixScanWithMultiValues(
+      prefixKey: UnsafeRow, colFamilyName: String): 
StateStoreIterator[UnsafeRowPair] = {
+    throw new UnsupportedOperationException("Doesn't support prefix scan with 
multiple values!")
+  }
+
+  override def iteratorWithMultiValues(
+      colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+    throw new UnsupportedOperationException("Doesn't support iterator with 
multiple values!")
+  }
+
   override def getStateStoreCheckpointInfo(): StateStoreCheckpointInfo = {
     StateStoreCheckpointInfo(id.partitionId, version + 1, None, None)
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
index 4d7443870238..d26f996b5151 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala
@@ -166,6 +166,16 @@ case class CkptIdCollectingStateStoreWrapper(innerStore: 
StateStore) extends Sta
     ret
   }
   override def hasCommitted: Boolean = innerStore.hasCommitted
+
+  override def prefixScanWithMultiValues(
+      prefixKey: UnsafeRow, colFamilyName: String): 
StateStoreIterator[UnsafeRowPair] = {
+    innerStore.prefixScanWithMultiValues(prefixKey, colFamilyName)
+  }
+
+  override def iteratorWithMultiValues(
+      colFamilyName: String): StateStoreIterator[UnsafeRowPair] = {
+    innerStore.iteratorWithMultiValues(colFamilyName)
+  }
 }
 
 class CkptIdCollectingStateStoreProviderWrapper extends StateStoreProvider {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala
index 498ac7db20b6..a65565d8b245 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBTimestampEncoderOperationsSuite.scala
@@ -223,7 +223,8 @@ class RocksDBTimestampEncoderOperationsSuite extends 
SharedSparkSession
     }
   }
 
-  // TODO: Address the new state format with Avro and enable the test with 
Avro encoding
+  // TODO: [SPARK-55145] Address the new state format with Avro and enable the 
test with Avro
+  //  encoding
   Seq("unsaferow").foreach { encoding =>
     test(s"Event time as prefix: iterator operations (encoding = $encoding)") {
       tryWithProviderResource(
@@ -281,6 +282,70 @@ class RocksDBTimestampEncoderOperationsSuite extends 
SharedSparkSession
       }
     }
 
+    test(s"Event time as prefix: iterator with multiple values (encoding = 
$encoding)") {
+      tryWithProviderResource(
+        newStoreProviderWithTimestampEncoder(
+          encoderType = "prefix",
+          useMultipleValuesPerKey = true,
+          dataEncoding = encoding)
+      ) { provider =>
+        val store = provider.getStore(0)
+
+        try {
+          // Put multiple values at different event times
+          // Insert in non-sorted order to verify ordering by event time
+          val values2 = Array(valueToRow(200), valueToRow(201))
+          store.putList(keyAndTimestampToRow("key1", 1, 1000L), values2)
+
+          val values1 = Array(valueToRow(100), valueToRow(101))
+          store.putList(keyAndTimestampToRow("key1", 1, -3000L), values1)
+
+          val values3 = Array(valueToRow(300), valueToRow(301))
+          store.putList(keyAndTimestampToRow("key2", 1, 2000L), values3)
+
+          // Test iteratorWithMultiValues
+          val iterator = store.iteratorWithMultiValues()
+          val results = iterator.map { pair =>
+            assert(pair.key.numFields() === 3) // key fields + timestamp
+
+            val keyStr = pair.key.getString(0)
+            val partitionId = pair.key.getInt(1)
+            // The timestamp will be placed at the end of the key row.
+            val timestamp = pair.key.getLong(2)
+            val value = pair.value.getInt(0)
+            (keyStr, partitionId, timestamp, value)
+          }.toList
+
+          iterator.close()
+
+          // Should return all individual values across different event times
+          assert(results.length === 6) // 2 values at each of 3 event times
+
+          // Verify results are ordered by event time (ascending)
+          val eventTimes = results.map(_._3)
+          val distinctEventTimes = eventTimes.distinct
+          assert(
+            distinctEventTimes === Seq(-3000L, 1000L, 2000L),
+            "Results should be ordered by event time"
+          )
+
+          // Verify the first 2 results are from time -3000L
+          assert(results.take(2).forall(_._3 === -3000L))
+          assert(results.take(2).map(_._4).toSet === Set(100, 101))
+
+          // Verify the next 2 results are from time 1000L
+          assert(results.slice(2, 4).forall(_._3 === 1000L))
+          assert(results.slice(2, 4).map(_._4).toSet === Set(200, 201))
+
+          // Verify the last 2 results are from time 2000L
+          assert(results.slice(4, 6).forall(_._3 === 2000L))
+          assert(results.slice(4, 6).map(_._4).toSet === Set(300, 301))
+        } finally {
+          store.abort()
+        }
+      }
+    }
+
     test(s"Event time as postfix: prefix scan operations (encoding = 
$encoding)") {
       tryWithProviderResource(
         newStoreProviderWithTimestampEncoder(encoderType = "postfix", 
dataEncoding = encoding)
@@ -336,6 +401,81 @@ class RocksDBTimestampEncoderOperationsSuite extends 
SharedSparkSession
         }
       }
     }
+
+    test(s"Event time as postfix: prefix scan with multiple values (encoding = 
$encoding)") {
+      tryWithProviderResource(
+        newStoreProviderWithTimestampEncoder(
+          encoderType = "postfix",
+          useMultipleValuesPerKey = true,
+          dataEncoding = encoding
+        )
+      ) { provider =>
+        val store = provider.getStore(0)
+
+        try {
+          // Put multiple values for the same key at different event times
+
+          // Insert in non-sorted order to verify ordering by event time
+          val values2 = Array(valueToRow(200), valueToRow(201))
+          store.putList(keyAndTimestampToRow("key1", 1, 1000L), values2)
+
+          val values1 = Array(valueToRow(100), valueToRow(101))
+          store.putList(keyAndTimestampToRow("key1", 1, -3000L), values1)
+
+          val values3 = Array(valueToRow(300), valueToRow(301))
+          store.putList(keyAndTimestampToRow("key1", 1, 2000L), values3)
+
+          // Different key - should not be returned
+          val values4 = Array(valueToRow(400))
+          store.putList(keyAndTimestampToRow("key2", 1, 1500L), values4)
+
+          // Test prefixScanWithMultiValues - pass complete key to find all 
event times
+          val key1 = keyToRow("key1", 1)
+          val iterator = store.prefixScanWithMultiValues(key1)
+
+          val results = iterator.map { pair =>
+            assert(pair.key.numFields() === 3) // key fields + timestamp
+
+            val keyStr = pair.key.getString(0)
+            val partitionId = pair.key.getInt(1)
+            // The timestamp will be placed at the end of the key row.
+            val timestamp = pair.key.getLong(2)
+            val value = pair.value.getInt(0)
+            (keyStr, partitionId, timestamp, value)
+          }.toList
+          iterator.close()
+
+          // Should return all individual values for key1 across different 
event times
+          assert(results.length === 6) // 2 values at each of 3 event times
+
+          // Verify results are ordered by event time (ascending)
+          // Group by event time to verify ordering
+          val eventTimes = results.map(_._3)
+          val distinctEventTimes = eventTimes.distinct
+          assert(
+            distinctEventTimes === Seq(-3000L, 1000L, 2000L),
+            "Results should be ordered by event time"
+          )
+
+          // Verify the first 2 results are from time -3000L
+          assert(results.take(2).forall(_._3 === -3000L))
+          assert(results.take(2).map(_._4).toSet === Set(100, 101))
+
+          // Verify the next 2 results are from time 1000L
+          assert(results.slice(2, 4).forall(_._3 === 1000L))
+          assert(results.slice(2, 4).map(_._4).toSet === Set(200, 201))
+
+          // Verify the last 2 results are from time 2000L
+          assert(results.slice(4, 6).forall(_._3 === 2000L))
+          assert(results.slice(4, 6).map(_._4).toSet === Set(300, 301))
+
+          // Should not contain key2
+          assert(!results.exists(_._1 == "key2"))
+        } finally {
+          store.abort()
+        }
+      }
+    }
   }
 
   // Diverse set of timestamps that exercise binary lexicographic encoding 
edge cases,
@@ -351,15 +491,19 @@ class RocksDBTimestampEncoderOperationsSuite extends 
SharedSparkSession
    * diverse timestamp values that exercise binary lexicographic encoding edge 
cases.
    *
    * @param encoderType "prefix" or "postfix"
+   * @param useMultipleValuesPerKey whether to store multiple values per (key, 
timestamp)
    * @param encoding data encoding format (e.g. "unsaferow")
    */
   private def testDiverseTimestampOrdering(
       encoderType: String,
+      useMultipleValuesPerKey: Boolean,
       encoding: String): Unit = {
+    val valuesPerKey = if (useMultipleValuesPerKey) 2 else 1
+
     tryWithProviderResource(
       newStoreProviderWithTimestampEncoder(
         encoderType = encoderType,
-        useMultipleValuesPerKey = false,
+        useMultipleValuesPerKey = useMultipleValuesPerKey,
         dataEncoding = encoding)
     ) { provider =>
       val store = provider.getStore(0)
@@ -368,7 +512,12 @@ class RocksDBTimestampEncoderOperationsSuite extends 
SharedSparkSession
         // Insert diverse timestamps in non-sorted order
         diverseTimestamps.zipWithIndex.foreach { case (ts, idx) =>
           val keyRow = keyAndTimestampToRow("key1", 1, ts)
-          store.put(keyRow, valueToRow(idx))
+          if (useMultipleValuesPerKey) {
+            val values = Array(valueToRow(idx * 10), valueToRow(idx * 10 + 1))
+            store.putList(keyRow, values)
+          } else {
+            store.put(keyRow, valueToRow(idx))
+          }
         }
 
         // For postfix encoder, add a different key to verify prefix scan 
isolation
@@ -380,16 +529,24 @@ class RocksDBTimestampEncoderOperationsSuite extends 
SharedSparkSession
         val iter = encoderType match {
           // For prefix encoder, we use iterator
           case "prefix" =>
-            store.iterator()
+            if (useMultipleValuesPerKey) {
+              store.iteratorWithMultiValues()
+            } else {
+              store.iterator()
+            }
           // For postfix encoder, we use prefix scan with ("key1", 1) as the 
prefix key
           case "postfix" =>
-            store.prefixScan(keyToRow("key1", 1))
+            if (useMultipleValuesPerKey) {
+              store.prefixScanWithMultiValues(keyToRow("key1", 1))
+            } else {
+              store.prefixScan(keyToRow("key1", 1))
+            }
         }
 
         val results = iter.map(_.key.getLong(2)).toList
         iter.close()
 
-        assert(results.length === diverseTimestamps.length)
+        assert(results.length === diverseTimestamps.length * valuesPerKey)
 
         // Verify event times are in ascending order
         val distinctEventTimes = results.distinct
@@ -405,9 +562,12 @@ class RocksDBTimestampEncoderOperationsSuite extends 
SharedSparkSession
   //  encoding
   Seq("unsaferow").foreach { encoding =>
     Seq("prefix", "postfix").foreach { encoderType =>
-      test(s"Event time as $encoderType: ordering with diverse timestamps" +
-        s" (encoding = $encoding)") {
-        testDiverseTimestampOrdering(encoderType, encoding)
+      Seq(false, true).foreach { useMultipleValuesPerKey =>
+        val multiValueSuffix = if (useMultipleValuesPerKey) " and multiple 
values" else ""
+        test(s"Event time as $encoderType: ordering with diverse timestamps" +
+          s"$multiValueSuffix (encoding = $encoding)") {
+          testDiverseTimestampOrdering(encoderType, useMultipleValuesPerKey, 
encoding)
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to