This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d6ca11e93535 [SPARK-51092][SS] Skip the v1 FlatMapGroupsWithState
tests with timeout on big endian platforms
d6ca11e93535 is described below
commit d6ca11e9353565c8c846b7573e5727a995732927
Author: Jonathan Albrecht <[email protected]>
AuthorDate: Fri Feb 21 07:12:11 2025 +0900
[SPARK-51092][SS] Skip the v1 FlatMapGroupsWithState tests with timeout on
big endian platforms
### What changes were proposed in this pull request?
Skip the v1 FlatMapGroupsWithState tests with timeout on big endian
platforms.
### Why are the changes needed?
The timestampTimeoutAttribute of StateManagerImplV1 is declared as
IntegerType instead of LongType which breaks serialization on big endian
platforms. This can't be fixed because it would be a breaking schema change so
skip the tests instead.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tested with existing tests on amd64 (little endian) and s390x (big endian)
Below is the test result from s390x:
```
- flatMapGroupsWithState - streaming with processing time timeout - state
format version 1 !!! CANCELED !!!
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
was false (FlatMapGroupsWithStateSuite.scala:471)
--
- flatMapGroupsWithState - streaming with processing time timeout - state
format version 2
--
- flatMapGroupsWithState - streaming with processing time timeout - state
format version 1 (RocksDBStateStore) !!! CANCELED !!!
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
was false (FlatMapGroupsWithStateSuite.scala:471)
--
- flatMapGroupsWithState - streaming with processing time timeout - state
format version 1 (RocksDBStateStore with changelog checkpointing) !!! CANCELED
!!!
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
was false (FlatMapGroupsWithStateSuite.scala:471)
--
- flatMapGroupsWithState - streaming with processing time timeout - state
format version 2 (RocksDBStateStore)
--
- flatMapGroupsWithState - streaming with processing time timeout - state
format version 2 (RocksDBStateStore with changelog checkpointing)
--
- flatMapGroupsWithState - streaming w/ event time timeout + watermark -
state format version 1 !!! CANCELED !!!
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
was false (FlatMapGroupsWithStateSuite.scala:539)
--
- flatMapGroupsWithState - streaming w/ event time timeout + watermark -
state format version 2
--
- flatMapGroupsWithState - streaming w/ event time timeout + watermark -
state format version 1 (RocksDBStateStore) !!! CANCELED !!!
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
was false (FlatMapGroupsWithStateSuite.scala:539)
--
- flatMapGroupsWithState - streaming w/ event time timeout + watermark -
state format version 1 (RocksDBStateStore with changelog checkpointing) !!!
CANCELED !!!
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
was false (FlatMapGroupsWithStateSuite.scala:539)
--
- flatMapGroupsWithState - streaming w/ event time timeout + watermark -
state format version 2 (RocksDBStateStore)
--
- flatMapGroupsWithState - streaming w/ event time timeout + watermark -
state format version 2 (RocksDBStateStore with changelog checkpointing)
--
- flatMapGroupsWithState, state ver 1 !!! CANCELED !!!
java.nio.ByteOrder.nativeOrder().equals(java.nio.ByteOrder.LITTLE_ENDIAN)
was false (StateDataSourceReadSuite.scala:802)
--
- flatMapGroupsWithState, state ver 2
--
- flatMapGroupsWithState, state ver 1 !!! CANCELED !!!
java.nio.ByteOrder.nativeOrder().equals(java.nio.ByteOrder.LITTLE_ENDIAN)
was false (StateDataSourceReadSuite.scala:802)
--
- flatMapGroupsWithState, state ver 2
--
- flatMapGroupsWithState, state ver 1 !!! CANCELED !!!
java.nio.ByteOrder.nativeOrder().equals(java.nio.ByteOrder.LITTLE_ENDIAN)
was false (StateDataSourceReadSuite.scala:802)
--
- flatMapGroupsWithState, state ver 2
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #49811 from jonathan-albrecht-ibm/master-endian-flatMapGroups.
Authored-by: Jonathan Albrecht <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../datasources/v2/state/StateDataSourceReadSuite.scala | 6 ++++++
.../spark/sql/streaming/FlatMapGroupsWithStateSuite.scala | 13 +++++++++++++
2 files changed, 19 insertions(+)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
index 4a274d51b62c..fca7d16012ce 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources.v2.state
import java.io.{File, FileWriter}
+import java.nio.ByteOrder
import org.apache.hadoop.conf.Configuration
import org.scalatest.Assertions
@@ -794,6 +795,11 @@ abstract class StateDataSourceReadSuite extends
StateDataSourceTestBase with Ass
}
test("flatMapGroupsWithState, state ver 1") {
+ // Skip this test on big endian platforms because the
timestampTimeoutAttribute of
+ // StateManagerImplV1 is declared as IntegerType instead of LongType which
breaks
+ // serialization on big endian. This can't be fixed because it would be a
breaking
+ // schema change.
+ assume(ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN))
testFlatMapGroupsWithState(1)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index f1feb62b7622..d785cc1a7f44 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming
import java.io.File
+import java.nio.ByteOrder
import java.sql.Timestamp
import org.apache.commons.io.FileUtils
@@ -458,7 +459,17 @@ class FlatMapGroupsWithStateSuite extends
StateStoreMetricsTest {
checkAnswer(df, Seq(("a", 2), ("b", 1)).toDF())
}
+ // Skip the v1 tests with timeout on big endian platforms because the
+ // timestampTimeoutAttribute of StateManagerImplV1 is declared as
IntegerType instead
+ // of LongType which breaks serialization on big endian. This can't be fixed
because it
+ // would be a breaking schema change.
+ def isStateFormatSupported(stateFormatVersion: Int): Boolean = {
+ stateFormatVersion != 1 ||
ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)
+ }
+
testWithAllStateVersions("flatMapGroupsWithState - streaming with processing
time timeout") {
+ assume(
+
isStateFormatSupported(sqlConf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)))
// Function to maintain the count as state and set the proc. time timeout
delay of 10 seconds.
// It returns the count if changed, or -1 if the state was removed by
timeout.
val stateFunc = (key: String, values: Iterator[String], state:
GroupState[RunningCount]) => {
@@ -526,6 +537,8 @@ class FlatMapGroupsWithStateSuite extends
StateStoreMetricsTest {
}
testWithAllStateVersions("flatMapGroupsWithState - streaming w/ event time
timeout + watermark") {
+ assume(
+
isStateFormatSupported(sqlConf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)))
val inputData = MemoryStream[(String, Int)]
val result =
inputData.toDS()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]