This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ea215279b0a [SPARK-39264][SS] Fix type check and conversion to
longOffset for awaitOffset fix
ea215279b0a is described below
commit ea215279b0a4785d48723f5f24c96b8d7d9aa355
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Tue May 24 13:28:04 2022 +0900
[SPARK-39264][SS] Fix type check and conversion to longOffset for
awaitOffset fix
### What changes were proposed in this pull request?
Fix type check and conversion to longOffset for awaitOffset fix. Based on
discussion with comments from alex-balikov
### Why are the changes needed?
To ensure type safety while doing comparisons and avoid type mismatch
related bugs/issues.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Change only for making type checks explicit. Ran existing tests and
verified that they pass. Also verified that the pattern matching change works
by running the test for 100 iterations few times.
```
[info] RateStreamProviderSuite:
15:37:58.700 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes where
applicable
[info] - RateStreamProvider in registry (438 milliseconds)
[info] - compatible with old path in registry (1 millisecond)
15:38:00.958 WARN
org.apache.spark.sql.execution.streaming.ResolveWriteToStream:
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets
and will be disabled.
[info] - microbatch - basic (2 seconds, 346 milliseconds)
15:38:02.074 WARN
org.apache.spark.sql.execution.streaming.ResolveWriteToStream:
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets
and will be disabled.
15:38:04.391 WARN
org.apache.spark.sql.execution.streaming.ResolveWriteToStream:
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets
and will be disabled.
[info] - microbatch - restart (4 seconds, 294 milliseconds)
15:38:06.450 WARN
org.apache.spark.sql.execution.streaming.ResolveWriteToStream:
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets
and will be disabled.
[info] - microbatch - uniform distribution of event timestamps (547
milliseconds)
[info] - microbatch - infer offsets (90 milliseconds)
[info] - microbatch - predetermined batch size (74 milliseconds)
[info] - microbatch - data read (73 milliseconds)
[info] - valueAtSecond (0 milliseconds)
15:38:07.243 WARN
org.apache.spark.sql.execution.streaming.ResolveWriteToStream:
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets
and will be disabled.
[info] - rampUpTime (1 second, 633 milliseconds)
15:38:08.806 WARN
org.apache.spark.sql.execution.streaming.ResolveWriteToStream:
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets
and will be disabled.
[info] - numPartitions (924 milliseconds)
15:38:09.702 WARN
org.apache.spark.sql.execution.streaming.ResolveWriteToStream:
spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets
and will be disabled.
[info] - overflow (241 milliseconds)
[info] - illegal option values (3 milliseconds)
[info] - user-specified schema given (8 milliseconds)
[info] - continuous data (1 second, 12 milliseconds)
15:38:11.035 WARN
org.apache.spark.sql.execution.streaming.sources.RateStreamProviderSuite:
===== POSSIBLE THREAD LEAK IN SUITE
o.a.s.sql.execution.streaming.sources.RateStreamProviderSuite, threads:
rpc-boss-3-1 (daemon=true), shuffle-boss-6-1 (daemon=true),
state-store-maintenance-task (daemon=true) =====
[info] Run completed in 13 seconds, 606 milliseconds.
[info] Total number of tests run: 15
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 15, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
Closes #36642 from anishshri-db/task/SPARK-39264.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../apache/spark/sql/execution/streaming/StreamExecution.scala | 10 ++++------
1 file changed, 4 insertions(+), 6 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ae99743baf3..88896c55455 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -449,12 +449,10 @@ abstract class StreamExecution(
// after the stream has moved past the expected newOffset or if
committedOffsets
// changed after notify. In this case, its safe to exit, since
at-least the given
// Offset has been reached and the equality condition might never be
met.
- if (!localCommittedOffsets.contains(source)) {
- true
- } else if (newOffset.isInstanceOf[LongOffset]) {
- localCommittedOffsets(source).toString.toLong <
newOffset.asInstanceOf[LongOffset].offset
- } else {
- localCommittedOffsets(source) != newOffset
+ (localCommittedOffsets.get(source), newOffset) match {
+ case (Some(LongOffset(localOffVal)), LongOffset(newOffVal)) =>
localOffVal < newOffVal
+ case (Some(localOff), newOff) => localOff != newOff
+ case (None, newOff) => true
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]