This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d6ca11e93535 [SPARK-51092][SS] Skip the v1 FlatMapGroupsWithState 
tests with timeout on big endian platforms
d6ca11e93535 is described below

commit d6ca11e9353565c8c846b7573e5727a995732927
Author: Jonathan Albrecht <[email protected]>
AuthorDate: Fri Feb 21 07:12:11 2025 +0900

    [SPARK-51092][SS] Skip the v1 FlatMapGroupsWithState tests with timeout on 
big endian platforms
    
    ### What changes were proposed in this pull request?
    Skip the v1 FlatMapGroupsWithState tests with timeout on big endian 
platforms.
    
    ### Why are the changes needed?
    The timestampTimeoutAttribute of StateManagerImplV1 is declared as 
IntegerType instead of LongType which breaks serialization on big endian 
platforms. This can't be fixed because it would be a breaking schema change so 
skip the tests instead.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Tested with existing tests on amd64 (little endian) and s390x (big endian)
    
    Below is the test result from s390x:
    
    ```
    - flatMapGroupsWithState - streaming with processing time timeout - state 
format version 1 !!! CANCELED !!!
      
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
 was false (FlatMapGroupsWithStateSuite.scala:471)
    --
    - flatMapGroupsWithState - streaming with processing time timeout - state 
format version 2
    --
    - flatMapGroupsWithState - streaming with processing time timeout - state 
format version 1 (RocksDBStateStore) !!! CANCELED !!!
      
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
 was false (FlatMapGroupsWithStateSuite.scala:471)
    --
    - flatMapGroupsWithState - streaming with processing time timeout - state 
format version 1 (RocksDBStateStore with changelog checkpointing) !!! CANCELED 
!!!
      
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
 was false (FlatMapGroupsWithStateSuite.scala:471)
    --
    - flatMapGroupsWithState - streaming with processing time timeout - state 
format version 2 (RocksDBStateStore)
    --
    - flatMapGroupsWithState - streaming with processing time timeout - state 
format version 2 (RocksDBStateStore with changelog checkpointing)
    --
    - flatMapGroupsWithState - streaming w/ event time timeout + watermark - 
state format version 1 !!! CANCELED !!!
      
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
 was false (FlatMapGroupsWithStateSuite.scala:539)
    --
    - flatMapGroupsWithState - streaming w/ event time timeout + watermark - 
state format version 2
    --
    - flatMapGroupsWithState - streaming w/ event time timeout + watermark - 
state format version 1 (RocksDBStateStore) !!! CANCELED !!!
      
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
 was false (FlatMapGroupsWithStateSuite.scala:539)
    --
    - flatMapGroupsWithState - streaming w/ event time timeout + watermark - 
state format version 1 (RocksDBStateStore with changelog checkpointing) !!! 
CANCELED !!!
      
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
 was false (FlatMapGroupsWithStateSuite.scala:539)
    --
    - flatMapGroupsWithState - streaming w/ event time timeout + watermark - 
state format version 2 (RocksDBStateStore)
    --
    - flatMapGroupsWithState - streaming w/ event time timeout + watermark - 
state format version 2 (RocksDBStateStore with changelog checkpointing)
    --
    - flatMapGroupsWithState, state ver 1 !!! CANCELED !!!
      java.nio.ByteOrder.nativeOrder().equals(java.nio.ByteOrder.LITTLE_ENDIAN) 
was false (StateDataSourceReadSuite.scala:802)
    --
    - flatMapGroupsWithState, state ver 2
    --
    - flatMapGroupsWithState, state ver 1 !!! CANCELED !!!
      java.nio.ByteOrder.nativeOrder().equals(java.nio.ByteOrder.LITTLE_ENDIAN) 
was false (StateDataSourceReadSuite.scala:802)
    --
    - flatMapGroupsWithState, state ver 2
    --
    - flatMapGroupsWithState, state ver 1 !!! CANCELED !!!
      java.nio.ByteOrder.nativeOrder().equals(java.nio.ByteOrder.LITTLE_ENDIAN) 
was false (StateDataSourceReadSuite.scala:802)
    --
    - flatMapGroupsWithState, state ver 2
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #49811 from jonathan-albrecht-ibm/master-endian-flatMapGroups.
    
    Authored-by: Jonathan Albrecht <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../datasources/v2/state/StateDataSourceReadSuite.scala     |  6 ++++++
 .../spark/sql/streaming/FlatMapGroupsWithStateSuite.scala   | 13 +++++++++++++
 2 files changed, 19 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
index 4a274d51b62c..fca7d16012ce 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.execution.datasources.v2.state
 
 import java.io.{File, FileWriter}
+import java.nio.ByteOrder
 
 import org.apache.hadoop.conf.Configuration
 import org.scalatest.Assertions
@@ -794,6 +795,11 @@ abstract class StateDataSourceReadSuite extends 
StateDataSourceTestBase with Ass
   }
 
   test("flatMapGroupsWithState, state ver 1") {
+    // Skip this test on big endian platforms because the 
timestampTimeoutAttribute of
+    // StateManagerImplV1 is declared as IntegerType instead of LongType which 
breaks
+    // serialization on big endian. This can't be fixed because it would be a 
breaking
+    // schema change.
+    assume(ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN))
     testFlatMapGroupsWithState(1)
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index f1feb62b7622..d785cc1a7f44 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.streaming
 
 import java.io.File
+import java.nio.ByteOrder
 import java.sql.Timestamp
 
 import org.apache.commons.io.FileUtils
@@ -458,7 +459,17 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest {
     checkAnswer(df, Seq(("a", 2), ("b", 1)).toDF())
   }
 
+  // Skip the v1 tests with timeout on big endian platforms because the
+  // timestampTimeoutAttribute of StateManagerImplV1 is declared as 
IntegerType instead
+  // of LongType which breaks serialization on big endian. This can't be fixed 
because it
+  // would be a breaking schema change.
+  def isStateFormatSupported(stateFormatVersion: Int): Boolean = {
+    stateFormatVersion != 1 || 
ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)
+  }
+
   testWithAllStateVersions("flatMapGroupsWithState - streaming with processing 
time timeout") {
+    assume(
+      
isStateFormatSupported(sqlConf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)))
     // Function to maintain the count as state and set the proc. time timeout 
delay of 10 seconds.
     // It returns the count if changed, or -1 if the state was removed by 
timeout.
     val stateFunc = (key: String, values: Iterator[String], state: 
GroupState[RunningCount]) => {
@@ -526,6 +537,8 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest {
   }
 
   testWithAllStateVersions("flatMapGroupsWithState - streaming w/ event time 
timeout + watermark") {
+    assume(
+      
isStateFormatSupported(sqlConf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)))
     val inputData = MemoryStream[(String, Int)]
     val result =
       inputData.toDS()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to