This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new fb6e8ab2d2da [SPARK-51092][SS] Skip the v1 FlatMapGroupsWithState
tests with timeout on big endian platforms
fb6e8ab2d2da is described below
commit fb6e8ab2d2daed465a64cc8d3b02863e48651d34
Author: Jonathan Albrecht <[email protected]>
AuthorDate: Fri Feb 21 07:12:11 2025 +0900
[SPARK-51092][SS] Skip the v1 FlatMapGroupsWithState tests with timeout on
big endian platforms
### What changes were proposed in this pull request?
Skip the v1 FlatMapGroupsWithState tests with timeout on big endian
platforms.
### Why are the changes needed?
The timestampTimeoutAttribute of StateManagerImplV1 is declared as
IntegerType instead of LongType which breaks serialization on big endian
platforms. This can't be fixed because it would be a breaking schema change so
skip the tests instead.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tested with existing tests on amd64 (little endian) and s390x (big endian)
Below is the test result from s390x:
```
- flatMapGroupsWithState - streaming with processing time timeout - state
format version 1 !!! CANCELED !!!
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
was false (FlatMapGroupsWithStateSuite.scala:471)
--
- flatMapGroupsWithState - streaming with processing time timeout - state
format version 2
--
- flatMapGroupsWithState - streaming with processing time timeout - state
format version 1 (RocksDBStateStore) !!! CANCELED !!!
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
was false (FlatMapGroupsWithStateSuite.scala:471)
--
- flatMapGroupsWithState - streaming with processing time timeout - state
format version 1 (RocksDBStateStore with changelog checkpointing) !!! CANCELED
!!!
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
was false (FlatMapGroupsWithStateSuite.scala:471)
--
- flatMapGroupsWithState - streaming with processing time timeout - state
format version 2 (RocksDBStateStore)
--
- flatMapGroupsWithState - streaming with processing time timeout - state
format version 2 (RocksDBStateStore with changelog checkpointing)
--
- flatMapGroupsWithState - streaming w/ event time timeout + watermark -
state format version 1 !!! CANCELED !!!
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
was false (FlatMapGroupsWithStateSuite.scala:539)
--
- flatMapGroupsWithState - streaming w/ event time timeout + watermark -
state format version 2
--
- flatMapGroupsWithState - streaming w/ event time timeout + watermark -
state format version 1 (RocksDBStateStore) !!! CANCELED !!!
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
was false (FlatMapGroupsWithStateSuite.scala:539)
--
- flatMapGroupsWithState - streaming w/ event time timeout + watermark -
state format version 1 (RocksDBStateStore with changelog checkpointing) !!!
CANCELED !!!
FlatMapGroupsWithStateSuite.this.isStateFormatSupported(FlatMapGroupsWithStateSuite.this.sqlConf.getConf[Int](org.apache.spark.sql.internal.SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION))
was false (FlatMapGroupsWithStateSuite.scala:539)
--
- flatMapGroupsWithState - streaming w/ event time timeout + watermark -
state format version 2 (RocksDBStateStore)
--
- flatMapGroupsWithState - streaming w/ event time timeout + watermark -
state format version 2 (RocksDBStateStore with changelog checkpointing)
--
- flatMapGroupsWithState, state ver 1 !!! CANCELED !!!
java.nio.ByteOrder.nativeOrder().equals(java.nio.ByteOrder.LITTLE_ENDIAN)
was false (StateDataSourceReadSuite.scala:802)
--
- flatMapGroupsWithState, state ver 2
--
- flatMapGroupsWithState, state ver 1 !!! CANCELED !!!
java.nio.ByteOrder.nativeOrder().equals(java.nio.ByteOrder.LITTLE_ENDIAN)
was false (StateDataSourceReadSuite.scala:802)
--
- flatMapGroupsWithState, state ver 2
--
- flatMapGroupsWithState, state ver 1 !!! CANCELED !!!
java.nio.ByteOrder.nativeOrder().equals(java.nio.ByteOrder.LITTLE_ENDIAN)
was false (StateDataSourceReadSuite.scala:802)
--
- flatMapGroupsWithState, state ver 2
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #49811 from jonathan-albrecht-ibm/master-endian-flatMapGroups.
Authored-by: Jonathan Albrecht <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit d6ca11e9353565c8c846b7573e5727a995732927)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../datasources/v2/state/StateDataSourceReadSuite.scala | 6 ++++++
.../spark/sql/streaming/FlatMapGroupsWithStateSuite.scala | 13 +++++++++++++
2 files changed, 19 insertions(+)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
index 4a274d51b62c..fca7d16012ce 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.datasources.v2.state
import java.io.{File, FileWriter}
+import java.nio.ByteOrder
import org.apache.hadoop.conf.Configuration
import org.scalatest.Assertions
@@ -794,6 +795,11 @@ abstract class StateDataSourceReadSuite extends
StateDataSourceTestBase with Ass
}
test("flatMapGroupsWithState, state ver 1") {
+ // Skip this test on big endian platforms because the
timestampTimeoutAttribute of
+ // StateManagerImplV1 is declared as IntegerType instead of LongType which
breaks
+ // serialization on big endian. This can't be fixed because it would be a
breaking
+ // schema change.
+ assume(ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN))
testFlatMapGroupsWithState(1)
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index f1feb62b7622..d785cc1a7f44 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.streaming
import java.io.File
+import java.nio.ByteOrder
import java.sql.Timestamp
import org.apache.commons.io.FileUtils
@@ -458,7 +459,17 @@ class FlatMapGroupsWithStateSuite extends
StateStoreMetricsTest {
checkAnswer(df, Seq(("a", 2), ("b", 1)).toDF())
}
+ // Skip the v1 tests with timeout on big endian platforms because the
+ // timestampTimeoutAttribute of StateManagerImplV1 is declared as
IntegerType instead
+ // of LongType which breaks serialization on big endian. This can't be fixed
because it
+ // would be a breaking schema change.
+ def isStateFormatSupported(stateFormatVersion: Int): Boolean = {
+ stateFormatVersion != 1 ||
ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)
+ }
+
testWithAllStateVersions("flatMapGroupsWithState - streaming with processing
time timeout") {
+ assume(
+
isStateFormatSupported(sqlConf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)))
// Function to maintain the count as state and set the proc. time timeout
delay of 10 seconds.
// It returns the count if changed, or -1 if the state was removed by
timeout.
val stateFunc = (key: String, values: Iterator[String], state:
GroupState[RunningCount]) => {
@@ -526,6 +537,8 @@ class FlatMapGroupsWithStateSuite extends
StateStoreMetricsTest {
}
testWithAllStateVersions("flatMapGroupsWithState - streaming w/ event time
timeout + watermark") {
+ assume(
+
isStateFormatSupported(sqlConf.getConf(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)))
val inputData = MemoryStream[(String, Int)]
val result =
inputData.toDS()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]