Repository: spark
Updated Branches:
  refs/heads/branch-2.2 4c212eed1 -> 86cd3c088


[SPARK-21464][SS] Minimize deprecation warnings caused by ProcessingTime class

## What changes were proposed in this pull request?

Use of `ProcessingTime` class was deprecated in favor of 
`Trigger.ProcessingTime` in Spark 2.2. However interval uses to ProcessingTime 
causes deprecation warnings during compilation. This cannot be avoided entirely 
as even though it is deprecated as a public API, ProcessingTime instances are 
used internally in TriggerExecutor. This PR is to minimize the warning by 
removing its uses from tests as much as possible.

## How was this patch tested?
Existing tests.

Author: Tathagata Das <[email protected]>

Closes #18678 from tdas/SPARK-21464.

(cherry picked from commit 70fe99dc62ef636a99bcb8a580ad4de4dca95181)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/86cd3c08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/86cd3c08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/86cd3c08

Branch: refs/heads/branch-2.2
Commit: 86cd3c08871618441c0c297da0f48ac284595697
Parents: 4c212ee
Author: Tathagata Das <[email protected]>
Authored: Wed Jul 19 11:02:07 2017 -0700
Committer: Tathagata Das <[email protected]>
Committed: Wed Jul 19 11:02:18 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/ProcessingTimeSuite.scala  | 24 +++++++++++---------
 .../streaming/FlatMapGroupsWithStateSuite.scala |  6 ++---
 .../streaming/StreamingAggregationSuite.scala   |  8 +++----
 3 files changed, 20 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/86cd3c08/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
index 52c2007..623a1b6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
@@ -22,20 +22,22 @@ import java.util.concurrent.TimeUnit
 import scala.concurrent.duration._
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.streaming.ProcessingTime
+import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
 
 class ProcessingTimeSuite extends SparkFunSuite {
 
   test("create") {
-    assert(ProcessingTime(10.seconds).intervalMs === 10 * 1000)
-    assert(ProcessingTime.create(10, TimeUnit.SECONDS).intervalMs === 10 * 
1000)
-    assert(ProcessingTime("1 minute").intervalMs === 60 * 1000)
-    assert(ProcessingTime("interval 1 minute").intervalMs === 60 * 1000)
-
-    intercept[IllegalArgumentException] { ProcessingTime(null: String) }
-    intercept[IllegalArgumentException] { ProcessingTime("") }
-    intercept[IllegalArgumentException] { ProcessingTime("invalid") }
-    intercept[IllegalArgumentException] { ProcessingTime("1 month") }
-    intercept[IllegalArgumentException] { ProcessingTime("1 year") }
+    def getIntervalMs(trigger: Trigger): Long = 
trigger.asInstanceOf[ProcessingTime].intervalMs
+
+    assert(getIntervalMs(Trigger.ProcessingTime(10.seconds)) === 10 * 1000)
+    assert(getIntervalMs(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) === 10 
* 1000)
+    assert(getIntervalMs(Trigger.ProcessingTime("1 minute")) === 60 * 1000)
+    assert(getIntervalMs(Trigger.ProcessingTime("interval 1 minute")) === 60 * 
1000)
+
+    intercept[IllegalArgumentException] { Trigger.ProcessingTime(null: String) 
}
+    intercept[IllegalArgumentException] { Trigger.ProcessingTime("") }
+    intercept[IllegalArgumentException] { Trigger.ProcessingTime("invalid") }
+    intercept[IllegalArgumentException] { Trigger.ProcessingTime("1 month") }
+    intercept[IllegalArgumentException] { Trigger.ProcessingTime("1 year") }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/86cd3c08/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index 6bb9408..d737064 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -680,7 +680,7 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
         .flatMapGroupsWithState(Update, ProcessingTimeTimeout)(stateFunc)
 
     testStream(result, Update)(
-      StartStream(ProcessingTime("1 second"), triggerClock = clock),
+      StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
       AddData(inputData, "a"),
       AdvanceManualClock(1 * 1000),
       CheckLastBatch(("a", "1")),
@@ -745,7 +745,7 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
         .flatMapGroupsWithState(Update, EventTimeTimeout)(stateFunc)
 
     testStream(result, Update)(
-      StartStream(ProcessingTime("1 second")),
+      StartStream(Trigger.ProcessingTime("1 second")),
       AddData(inputData, ("a", 11), ("a", 13), ("a", 15)), // Set timeout 
timestamp of ...
       CheckLastBatch(("a", 15)),                           // "a" to 15 + 5 = 
20s, watermark to 5s
       AddData(inputData, ("a", 4)),       // Add data older than watermark for 
"a"
@@ -917,7 +917,7 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest with BeforeAndAf
           .flatMapGroupsWithState(Update, ProcessingTimeTimeout)(stateFunc)
 
       testStream(result, Update)(
-        StartStream(ProcessingTime("1 second"), triggerClock = clock),
+        StartStream(Trigger.ProcessingTime("1 second"), triggerClock = clock),
         AddData(inputData, ("a", 1L)),
         AdvanceManualClock(1 * 1000),
         CheckLastBatch(("a", "1"))

http://git-wip-us.apache.org/repos/asf/spark/blob/86cd3c08/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
index 4345a70..b6e82b6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala
@@ -267,7 +267,7 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest with BeforeAndAfte
         .where('value >= current_timestamp().cast("long") - 10L)
 
     testStream(aggregated, Complete)(
-      StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
+      StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
 
       // advance clock to 10 seconds, all keys retained
       AddData(inputData, 0L, 5L, 5L, 10L),
@@ -294,7 +294,7 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest with BeforeAndAfte
         clock.advance(60 * 1000L)
         true
       },
-      StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
+      StartStream(Trigger.ProcessingTime("10 seconds"), triggerClock = clock),
       // The commit log blown, causing the last batch to re-run
       CheckLastBatch((20L, 1), (85L, 1)),
       AssertOnQuery { q =>
@@ -322,7 +322,7 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest with BeforeAndAfte
         .where($"value".cast("date") >= date_sub(current_date(), 10))
         .select(($"value".cast("long") / 
DateTimeUtils.SECONDS_PER_DAY).cast("long"), $"count(1)")
     testStream(aggregated, Complete)(
-      StartStream(ProcessingTime("10 day"), triggerClock = clock),
+      StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
       // advance clock to 10 days, should retain all keys
       AddData(inputData, 0L, 5L, 5L, 10L),
       AdvanceManualClock(DateTimeUtils.MILLIS_PER_DAY * 10),
@@ -346,7 +346,7 @@ class StreamingAggregationSuite extends 
StateStoreMetricsTest with BeforeAndAfte
         clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
         true
       },
-      StartStream(ProcessingTime("10 day"), triggerClock = clock),
+      StartStream(Trigger.ProcessingTime("10 day"), triggerClock = clock),
       // Commit log blown, causing a re-run of the last batch
       CheckLastBatch((20L, 1), (85L, 1)),
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to